BUG-2138: Use correct actor context in shard lookup.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.Status.Failure;
20 import akka.actor.Status.Success;
21 import akka.cluster.Cluster;
22 import akka.cluster.ClusterEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.MemberWeaklyUp;
27 import akka.cluster.ClusterEvent.ReachableMember;
28 import akka.cluster.ClusterEvent.UnreachableMember;
29 import akka.cluster.Member;
30 import akka.cluster.ddata.DistributedData;
31 import akka.cluster.ddata.ORMap;
32 import akka.cluster.ddata.Replicator;
33 import akka.cluster.ddata.Replicator.Changed;
34 import akka.cluster.ddata.Replicator.Subscribe;
35 import akka.cluster.ddata.Replicator.Update;
36 import akka.dispatch.OnComplete;
37 import akka.pattern.Patterns;
38 import akka.util.Timeout;
39 import com.google.common.base.Optional;
40 import com.google.common.base.Preconditions;
41 import com.google.common.collect.Sets;
42 import com.google.common.collect.Sets.SetView;
43 import java.util.ArrayList;
44 import java.util.Collection;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.concurrent.CompletableFuture;
49 import java.util.concurrent.TimeUnit;
50 import java.util.function.Function;
51 import java.util.stream.Collectors;
52 import javax.annotation.Nullable;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
55 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
56 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
57 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
58 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
59 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
60 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
61 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
62 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
63 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
64 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
65 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
66 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
67 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
68 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
69 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
74 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
75 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
76 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
77 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
78 import org.opendaylight.yangtools.concepts.ListenerRegistration;
79 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82 import scala.compat.java8.FutureConverters;
83 import scala.concurrent.Future;
84 import scala.concurrent.duration.FiniteDuration;
85
86 /**
87  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
88  * nodes of newly open producers/shards on the local node.
89  */
90 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
91
92     private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
93
94     private static final String PERSISTENCE_ID = "sharding-service-actor";
95     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
96
97     static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
98     static final int LOOKUP_TASK_MAX_RETRIES = 100;
99
100     private final DistributedShardedDOMDataTree shardingService;
101     private final ActorSystem actorSystem;
102     private final ClusterWrapper clusterWrapper;
103     // helper actorContext used only for static calls to executeAsync etc
104     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
105     private final ActorContext actorContext;
106     private final ShardingServiceAddressResolver resolver;
107     private final DistributedDataStore distributedConfigDatastore;
108     private final DistributedDataStore distributedOperDatastore;
109
110     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
111     private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
112
113     private final Cluster cluster;
114     private final ActorRef replicator;
115
116     private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
117     private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
118
119     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
120         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
121
122         shardingService = builder.getShardingService();
123         actorSystem = builder.getActorSystem();
124         clusterWrapper = builder.getClusterWrapper();
125         distributedConfigDatastore = builder.getDistributedConfigDatastore();
126         distributedOperDatastore = builder.getDistributedOperDatastore();
127         actorContext = distributedConfigDatastore.getActorContext();
128         resolver = new ShardingServiceAddressResolver(
129                 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
130
131         clusterWrapper.subscribeToMemberEvents(self());
132         cluster = Cluster.get(actorSystem);
133
134         replicator = DistributedData.get(context().system()).replicator();
135     }
136
137     @Override
138     public void preStart() {
139         final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
140                 new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
141         replicator.tell(subscribe, noSender());
142     }
143
144     @Override
145     protected void handleRecover(final Object message) throws Exception {
146         LOG.debug("Received a recover message {}", message);
147     }
148
149     @Override
150     protected void handleCommand(final Object message) throws Exception {
151         LOG.debug("Received {}", message);
152         if (message instanceof ClusterEvent.MemberUp) {
153             memberUp((ClusterEvent.MemberUp) message);
154         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
155             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
156         } else if (message instanceof ClusterEvent.MemberExited) {
157             memberExited((ClusterEvent.MemberExited) message);
158         } else if (message instanceof ClusterEvent.MemberRemoved) {
159             memberRemoved((ClusterEvent.MemberRemoved) message);
160         } else if (message instanceof ClusterEvent.UnreachableMember) {
161             memberUnreachable((ClusterEvent.UnreachableMember) message);
162         } else if (message instanceof ClusterEvent.ReachableMember) {
163             memberReachable((ClusterEvent.ReachableMember) message);
164         } else if (message instanceof Changed) {
165             onConfigChanged((Changed) message);
166         } else if (message instanceof ProducerCreated) {
167             onProducerCreated((ProducerCreated) message);
168         } else if (message instanceof NotifyProducerCreated) {
169             onNotifyProducerCreated((NotifyProducerCreated) message);
170         } else if (message instanceof ProducerRemoved) {
171             onProducerRemoved((ProducerRemoved) message);
172         } else if (message instanceof NotifyProducerRemoved) {
173             onNotifyProducerRemoved((NotifyProducerRemoved) message);
174         } else if (message instanceof PrefixShardCreated) {
175             onPrefixShardCreated((PrefixShardCreated) message);
176         } else if (message instanceof CreatePrefixShard) {
177             onCreatePrefixShard((CreatePrefixShard) message);
178         } else if (message instanceof RemovePrefixShard) {
179             onRemovePrefixShard((RemovePrefixShard) message);
180         } else if (message instanceof PrefixShardRemoved) {
181             onPrefixShardRemoved((PrefixShardRemoved) message);
182         }
183     }
184
185     private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
186         LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
187
188         currentData = change.dataValue();
189         final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
190
191         LOG.debug("Changed set {}", changedConfig);
192
193         try {
194             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
195                     changedConfig.values().stream().collect(
196                             Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
197             resolveConfig(newConfig);
198         } catch (final IllegalStateException e) {
199             LOG.error("Failed, ", e);
200         }
201
202     }
203
204     private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
205
206         // get the removed configurations
207         final SetView<DOMDataTreeIdentifier> deleted =
208                 Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
209         shardingService.resolveShardRemovals(deleted);
210
211         // get the added configurations
212         final SetView<DOMDataTreeIdentifier> additions =
213                 Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
214         shardingService.resolveShardAdditions(additions);
215         // we can ignore those that existed previously since the potential changes in replicas will be handled by
216         // shard manager.
217
218         currentConfiguration = new HashMap<>(newConfig);
219     }
220
221     @Override
222     public String persistenceId() {
223         return PERSISTENCE_ID;
224     }
225
226     private void memberUp(final MemberUp message) {
227         final MemberName memberName = memberToName(message.member());
228
229         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
230                 message.member().address());
231
232         resolver.addPeerAddress(memberName, message.member().address());
233     }
234
235     private void memberWeaklyUp(final MemberWeaklyUp message) {
236         final MemberName memberName = memberToName(message.member());
237
238         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
239                 message.member().address());
240
241         resolver.addPeerAddress(memberName, message.member().address());
242     }
243
244     private void memberExited(final MemberExited message) {
245         final MemberName memberName = memberToName(message.member());
246
247         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
248                 message.member().address());
249
250         resolver.removePeerAddress(memberName);
251     }
252
253     private void memberRemoved(final MemberRemoved message) {
254         final MemberName memberName = memberToName(message.member());
255
256         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
257                 message.member().address());
258
259         resolver.removePeerAddress(memberName);
260     }
261
262     private void memberUnreachable(final UnreachableMember message) {
263         final MemberName memberName = memberToName(message.member());
264         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
265
266         resolver.removePeerAddress(memberName);
267     }
268
269     private void memberReachable(final ReachableMember message) {
270         final MemberName memberName = memberToName(message.member());
271         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
272
273         resolver.addPeerAddress(memberName, message.member().address());
274     }
275
276     private void onProducerCreated(final ProducerCreated message) {
277         LOG.debug("Received ProducerCreated: {}", message);
278
279         // fastpath if we have no peers
280         if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
281             getSender().tell(new Status.Success(null), noSender());
282         }
283
284         final ActorRef sender = getSender();
285         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
286
287         final List<CompletableFuture<Object>> futures = new ArrayList<>();
288
289         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
290             final ActorSelection actorSelection = actorSystem.actorSelection(address);
291             futures.add(
292                     FutureConverters.toJava(
293                             actorContext.executeOperationAsync(
294                                     actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
295                     .toCompletableFuture());
296         }
297
298         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
299                 futures.toArray(new CompletableFuture[futures.size()]));
300
301         combinedFuture.thenRun(() -> {
302             sender.tell(new Status.Success(null), noSender());
303         }).exceptionally(throwable -> {
304             sender.tell(new Status.Failure(throwable), self());
305             return null;
306         });
307     }
308
309     private void onNotifyProducerCreated(final NotifyProducerCreated message) {
310         LOG.debug("Received NotifyProducerCreated: {}", message);
311
312         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
313
314         try {
315             final ActorProducerRegistration registration =
316                     new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
317             subtrees.forEach(id -> idToProducer.put(id, registration));
318             sender().tell(new Status.Success(null), self());
319         } catch (final IllegalArgumentException e) {
320             sender().tell(new Status.Failure(e), getSelf());
321         }
322     }
323
324     private void onProducerRemoved(final ProducerRemoved message) {
325         LOG.debug("Received ProducerRemoved: {}", message);
326
327         final List<CompletableFuture<Object>> futures = new ArrayList<>();
328
329         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
330             final ActorSelection selection = actorSystem.actorSelection(address);
331
332             futures.add(FutureConverters.toJava(
333                     actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
334                     .toCompletableFuture());
335         }
336
337         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
338                 futures.toArray(new CompletableFuture[futures.size()]));
339
340         final ActorRef respondTo = getSender();
341
342         combinedFuture
343                 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
344                 .exceptionally(e -> {
345                     respondTo.tell(new Status.Failure(null), self());
346                     return null;
347                 });
348
349     }
350
351     private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
352         LOG.debug("Received NotifyProducerRemoved: {}", message);
353
354         final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
355         if (registration == null) {
356             LOG.warn("The notification contained a path on which no producer is registered, throwing away");
357             getSender().tell(new Status.Success(null), noSender());
358             return;
359         }
360
361         try {
362             registration.close();
363             getSender().tell(new Status.Success(null), noSender());
364         } catch (final DOMDataTreeProducerException e) {
365             LOG.error("Unable to close producer", e);
366             getSender().tell(new Status.Failure(e), noSender());
367         }
368     }
369
370     @SuppressWarnings("checkstyle:IllegalCatch")
371     private void onCreatePrefixShard(final CreatePrefixShard message) {
372         LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
373
374         final PrefixShardConfiguration configuration = message.getConfiguration();
375
376         final Update<ORMap<PrefixShardConfiguration>> update =
377                 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
378                     map -> map.put(cluster, configuration.toDataMapKey(), configuration));
379
380         replicator.tell(update, self());
381
382         final ActorContext context =
383                 configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
384                         ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
385
386         // schedule a notification task for the reply
387         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
388                 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
389                         context, shardingService, configuration.getPrefix()),
390                 actorSystem.dispatcher());
391     }
392
393     private void onPrefixShardCreated(final PrefixShardCreated message) {
394         LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
395
396         final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
397         final ActorRef sender = getSender();
398
399         final List<CompletableFuture<Object>> futures = new ArrayList<>();
400
401         for (final String address : addresses) {
402             final ActorSelection actorSelection = actorSystem.actorSelection(address);
403             futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
404                     new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
405         }
406
407         final CompletableFuture<Void> combinedFuture =
408                 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
409
410         combinedFuture.thenRun(() -> {
411             sender.tell(new Status.Success(null), self());
412         }).exceptionally(throwable -> {
413             sender.tell(new Status.Failure(throwable), self());
414             return null;
415         });
416     }
417
418     private void onRemovePrefixShard(final RemovePrefixShard message) {
419         LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
420
421         //TODO the removal message should have the configuration or some other way to get to the key
422         final Update<ORMap<PrefixShardConfiguration>> removal =
423                 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
424                     map -> map.remove(cluster, "prefix=" + message.getPrefix()));
425         replicator.tell(removal, self());
426
427         final ShardRemovalLookupTask removalTask =
428                 new ShardRemovalLookupTask(actorSystem, getSender(),
429                         actorContext, message.getPrefix());
430
431         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
432     }
433
434     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
435         LOG.debug("Received PrefixShardRemoved: {}", message);
436
437         final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
438
439         if (registration == null) {
440             LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
441                     message.getPrefix(), idToShardRegistration);
442             return;
443         }
444
445         registration.close();
446     }
447
448     private static MemberName memberToName(final Member member) {
449         return MemberName.forName(member.roles().iterator().next());
450     }
451
452     private class ActorProducerRegistration {
453
454         private final DOMDataTreeProducer producer;
455         private final Collection<DOMDataTreeIdentifier> subtrees;
456
457         ActorProducerRegistration(final DOMDataTreeProducer producer,
458                                   final Collection<DOMDataTreeIdentifier> subtrees) {
459             this.producer = producer;
460             this.subtrees = subtrees;
461         }
462
463         void close() throws DOMDataTreeProducerException {
464             producer.close();
465             subtrees.forEach(idToProducer::remove);
466         }
467     }
468
469     private static class ShardFrontendRegistration extends
470             AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
471
472         private final ActorRef clientActor;
473         private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
474
475         ShardFrontendRegistration(final ActorRef clientActor,
476                                   final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
477             super(shardRegistration);
478             this.clientActor = clientActor;
479             this.shardRegistration = shardRegistration;
480         }
481
482         @Override
483         protected void removeRegistration() {
484             shardRegistration.close();
485             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
486         }
487     }
488
489     private abstract static class LookupTask implements Runnable {
490
491         private final ActorRef replyTo;
492         private int retries = 0;
493
494         private LookupTask(final ActorRef replyTo) {
495             this.replyTo = replyTo;
496         }
497
498         abstract void reschedule(int retries);
499
500         void tryReschedule(@Nullable final Throwable throwable) {
501             if (retries <= LOOKUP_TASK_MAX_RETRIES) {
502                 retries++;
503                 reschedule(retries);
504             } else {
505                 fail(throwable);
506             }
507         }
508
509         void fail(@Nullable final Throwable throwable) {
510             if (throwable == null) {
511                 replyTo.tell(new Failure(
512                         new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
513                                 + "Failing..")), noSender());
514             } else {
515                 replyTo.tell(new Failure(
516                         new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
517                                 + "Failing..", throwable)), noSender());
518             }
519         }
520     }
521
522     /**
523      * Handles the lookup step of cds shard creation once the configuration is updated.
524      */
525     private static class ShardCreationLookupTask extends LookupTask {
526
527         private final ActorSystem system;
528         private final ActorRef replyTo;
529         private final ClusterWrapper clusterWrapper;
530         private final ActorContext context;
531         private final DistributedShardedDOMDataTree shardingService;
532         private final DOMDataTreeIdentifier toLookup;
533
534         ShardCreationLookupTask(final ActorSystem system,
535                                 final ActorRef replyTo,
536                                 final ClusterWrapper clusterWrapper,
537                                 final ActorContext context,
538                                 final DistributedShardedDOMDataTree shardingService,
539                                 final DOMDataTreeIdentifier toLookup) {
540             super(replyTo);
541             this.system = system;
542             this.replyTo = replyTo;
543             this.clusterWrapper = clusterWrapper;
544             this.context = context;
545             this.shardingService = shardingService;
546             this.toLookup = toLookup;
547         }
548
549         @Override
550         public void run() {
551             final Future<ActorRef> localShardFuture =
552                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
553
554             localShardFuture.onComplete(new OnComplete<ActorRef>() {
555                 @Override
556                 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
557                     if (throwable != null) {
558                         tryReschedule(throwable);
559                     } else {
560                         LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
561
562                         system.scheduler().scheduleOnce(
563                                 SHARD_LOOKUP_TASK_INTERVAL,
564                                 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
565                                         shardingService, toLookup),
566                                 system.dispatcher());
567                     }
568                 }
569             }, system.dispatcher());
570         }
571
572         @Override
573         void reschedule(int retries) {
574             LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
575             system.scheduler().scheduleOnce(
576                     SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
577         }
578     }
579
580     /**
581      * Handles the readiness step by waiting for a leader of the created shard.
582      */
583     private static class ShardLeaderLookupTask extends LookupTask {
584
585         private final ActorSystem system;
586         private final ActorRef replyTo;
587         private final ActorContext context;
588         private final ClusterWrapper clusterWrapper;
589         private final ActorRef shard;
590         private final DistributedShardedDOMDataTree shardingService;
591         private final DOMDataTreeIdentifier toLookup;
592
593         ShardLeaderLookupTask(final ActorSystem system,
594                               final ActorRef replyTo,
595                               final ActorContext context,
596                               final ClusterWrapper clusterWrapper,
597                               final ActorRef shard,
598                               final DistributedShardedDOMDataTree shardingService,
599                               final DOMDataTreeIdentifier toLookup) {
600             super(replyTo);
601             this.system = system;
602             this.replyTo = replyTo;
603             this.context = context;
604             this.clusterWrapper = clusterWrapper;
605             this.shard = shard;
606             this.shardingService = shardingService;
607             this.toLookup = toLookup;
608         }
609
610         @Override
611         public void run() {
612
613             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
614
615             ask.onComplete(new OnComplete<Object>() {
616                 @Override
617                 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
618                     if (throwable != null) {
619                         tryReschedule(throwable);
620                     } else {
621                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
622                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
623                         if (leaderActor.isPresent()) {
624                             // leader is found, backend seems ready, check if the frontend is ready
625                             LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
626                                     clusterWrapper.getCurrentMemberName(), toLookup);
627                             system.scheduler().scheduleOnce(
628                                     SHARD_LOOKUP_TASK_INTERVAL,
629                                     new FrontendLookupTask(system, replyTo, shardingService, toLookup),
630                                     system.dispatcher());
631                         } else {
632                             tryReschedule(null);
633                         }
634                     }
635                 }
636             }, system.dispatcher());
637
638         }
639
640         @Override
641         void reschedule(int retries) {
642             LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
643                     clusterWrapper.getCurrentMemberName(), toLookup, retries);
644             system.scheduler().scheduleOnce(
645                     SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
646         }
647     }
648
649     /**
650      * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
651      * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
652      * case they race), the future for the cds shard creation is completed and the shard is ready for use.
653      */
654     private static final class FrontendLookupTask extends LookupTask {
655
656         private final ActorSystem system;
657         private final ActorRef replyTo;
658         private final DistributedShardedDOMDataTree shardingService;
659         private final DOMDataTreeIdentifier toLookup;
660
661         FrontendLookupTask(final ActorSystem system,
662                            final ActorRef replyTo,
663                            final DistributedShardedDOMDataTree shardingService,
664                            final DOMDataTreeIdentifier toLookup) {
665             super(replyTo);
666             this.system = system;
667             this.replyTo = replyTo;
668             this.shardingService = shardingService;
669             this.toLookup = toLookup;
670         }
671
672         @Override
673         public void run() {
674             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
675                     shardingService.lookupShardFrontend(toLookup);
676
677             if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
678                 replyTo.tell(new Success(null), noSender());
679             } else {
680                 tryReschedule(null);
681             }
682         }
683
684         private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
685                                           final DOMDataTreeIdentifier prefix) {
686             if (entry == null) {
687                 return false;
688             }
689
690             if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
691                 return true;
692             }
693
694             if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
695                 return true;
696             }
697
698             return false;
699         }
700
701         @Override
702         void reschedule(int retries) {
703             LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
704             system.scheduler().scheduleOnce(
705                     SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
706         }
707     }
708
709     /**
710      * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
711      * configuration.
712      */
713     private static class ShardRemovalLookupTask extends LookupTask {
714
715         private final ActorSystem system;
716         private final ActorRef replyTo;
717         private final ActorContext context;
718         private final DOMDataTreeIdentifier toLookup;
719
720         ShardRemovalLookupTask(final ActorSystem system,
721                                final ActorRef replyTo,
722                                final ActorContext context,
723                                final DOMDataTreeIdentifier toLookup) {
724             super(replyTo);
725             this.system = system;
726             this.replyTo = replyTo;
727             this.context = context;
728             this.toLookup = toLookup;
729         }
730
731         @Override
732         public void run() {
733             final Future<ActorRef> localShardFuture =
734                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
735
736             localShardFuture.onComplete(new OnComplete<ActorRef>() {
737                 @Override
738                 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
739                     if (throwable != null) {
740                         //TODO Shouldn't we check why findLocalShard failed?
741                         LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
742                                 toLookup);
743                         replyTo.tell(new Success(null), noSender());
744                     } else {
745                         tryReschedule(null);
746                     }
747                 }
748             }, system.dispatcher());
749         }
750
751         @Override
752         void reschedule(int retries) {
753             LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
754                     toLookup, retries);
755             system.scheduler().scheduleOnce(
756                     SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
757         }
758     }
759
760     public static class ShardedDataTreeActorCreator {
761
762         private DistributedShardedDOMDataTree shardingService;
763         private DistributedDataStore distributedConfigDatastore;
764         private DistributedDataStore distributedOperDatastore;
765         private ActorSystem actorSystem;
766         private ClusterWrapper cluster;
767
768         public DistributedShardedDOMDataTree getShardingService() {
769             return shardingService;
770         }
771
772         public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
773             this.shardingService = shardingService;
774             return this;
775         }
776
777         public ActorSystem getActorSystem() {
778             return actorSystem;
779         }
780
781         public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
782             this.actorSystem = actorSystem;
783             return this;
784         }
785
786         public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
787             this.cluster = cluster;
788             return this;
789         }
790
791         public ClusterWrapper getClusterWrapper() {
792             return cluster;
793         }
794
795         public DistributedDataStore getDistributedConfigDatastore() {
796             return distributedConfigDatastore;
797         }
798
799         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
800                 final DistributedDataStore distributedConfigDatastore) {
801             this.distributedConfigDatastore = distributedConfigDatastore;
802             return this;
803         }
804
805         public DistributedDataStore getDistributedOperDatastore() {
806             return distributedOperDatastore;
807         }
808
809         public ShardedDataTreeActorCreator setDistributedOperDatastore(
810                 final DistributedDataStore distributedOperDatastore) {
811             this.distributedOperDatastore = distributedOperDatastore;
812             return this;
813         }
814
815         private void verify() {
816             Preconditions.checkNotNull(shardingService);
817             Preconditions.checkNotNull(actorSystem);
818             Preconditions.checkNotNull(cluster);
819             Preconditions.checkNotNull(distributedConfigDatastore);
820             Preconditions.checkNotNull(distributedOperDatastore);
821         }
822
823         public Props props() {
824             verify();
825             return Props.create(ShardedDataTreeActor.class, this);
826         }
827     }
828 }