Fix sonar warnings in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.Status.Success;
20 import akka.cluster.ClusterEvent;
21 import akka.cluster.ClusterEvent.MemberExited;
22 import akka.cluster.ClusterEvent.MemberRemoved;
23 import akka.cluster.ClusterEvent.MemberUp;
24 import akka.cluster.ClusterEvent.MemberWeaklyUp;
25 import akka.cluster.ClusterEvent.ReachableMember;
26 import akka.cluster.ClusterEvent.UnreachableMember;
27 import akka.cluster.Member;
28 import akka.dispatch.OnComplete;
29 import akka.pattern.Patterns;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.base.Preconditions;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.concurrent.CompletableFuture;
40 import java.util.concurrent.TimeUnit;
41 import org.opendaylight.controller.cluster.access.concepts.MemberName;
42 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
43 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
44 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
45 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
46 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
47 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
48 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
49 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
50 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
51 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
52 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
53 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
54 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
55 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
56 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
57 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
58 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
59 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
64 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
65 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
66 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
67 import org.opendaylight.yangtools.concepts.ListenerRegistration;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71 import scala.compat.java8.FutureConverters;
72 import scala.concurrent.Future;
73 import scala.concurrent.duration.FiniteDuration;
74
75 /**
76  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
77  * nodes of newly open producers/shards on the local node.
78  */
79 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
80
81     private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
82
83     private static final String PERSISTENCE_ID = "sharding-service-actor";
84     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
85
86     static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
87
88     private final DistributedShardedDOMDataTree shardingService;
89     private final ActorSystem actorSystem;
90     private final ClusterWrapper clusterWrapper;
91     // helper actorContext used only for static calls to executeAsync etc
92     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
93     private final ActorContext actorContext;
94     private final ShardingServiceAddressResolver resolver;
95     private final AbstractDataStore distributedConfigDatastore;
96     private final AbstractDataStore distributedOperDatastore;
97     private final int lookupTaskMaxRetries;
98
99     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
100
101     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
102         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
103
104         shardingService = builder.getShardingService();
105         actorSystem = builder.getActorSystem();
106         clusterWrapper = builder.getClusterWrapper();
107         distributedConfigDatastore = builder.getDistributedConfigDatastore();
108         distributedOperDatastore = builder.getDistributedOperDatastore();
109         lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
110         actorContext = distributedConfigDatastore.getActorContext();
111         resolver = new ShardingServiceAddressResolver(
112                 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
113
114         clusterWrapper.subscribeToMemberEvents(self());
115     }
116
117     @Override
118     public void preStart() {
119     }
120
121     @Override
122     protected void handleRecover(final Object message) throws Exception {
123         LOG.debug("Received a recover message {}", message);
124     }
125
126     @Override
127     protected void handleCommand(final Object message) throws Exception {
128         LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
129         if (message instanceof ClusterEvent.MemberUp) {
130             memberUp((ClusterEvent.MemberUp) message);
131         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
132             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
133         } else if (message instanceof ClusterEvent.MemberExited) {
134             memberExited((ClusterEvent.MemberExited) message);
135         } else if (message instanceof ClusterEvent.MemberRemoved) {
136             memberRemoved((ClusterEvent.MemberRemoved) message);
137         } else if (message instanceof ClusterEvent.UnreachableMember) {
138             memberUnreachable((ClusterEvent.UnreachableMember) message);
139         } else if (message instanceof ClusterEvent.ReachableMember) {
140             memberReachable((ClusterEvent.ReachableMember) message);
141         } else if (message instanceof ProducerCreated) {
142             onProducerCreated((ProducerCreated) message);
143         } else if (message instanceof NotifyProducerCreated) {
144             onNotifyProducerCreated((NotifyProducerCreated) message);
145         } else if (message instanceof ProducerRemoved) {
146             onProducerRemoved((ProducerRemoved) message);
147         } else if (message instanceof NotifyProducerRemoved) {
148             onNotifyProducerRemoved((NotifyProducerRemoved) message);
149         } else if (message instanceof PrefixShardCreated) {
150             onPrefixShardCreated((PrefixShardCreated) message);
151         } else if (message instanceof LookupPrefixShard) {
152             onLookupPrefixShard((LookupPrefixShard) message);
153         } else if (message instanceof PrefixShardRemovalLookup) {
154             onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
155         } else if (message instanceof PrefixShardRemoved) {
156             onPrefixShardRemoved((PrefixShardRemoved) message);
157         } else if (message instanceof StartConfigShardLookup) {
158             onStartConfigShardLookup((StartConfigShardLookup) message);
159         }
160     }
161
162     @Override
163     public String persistenceId() {
164         return PERSISTENCE_ID;
165     }
166
167     private void memberUp(final MemberUp message) {
168         final MemberName memberName = memberToName(message.member());
169
170         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
171                 message.member().address());
172
173         resolver.addPeerAddress(memberName, message.member().address());
174     }
175
176     private void memberWeaklyUp(final MemberWeaklyUp message) {
177         final MemberName memberName = memberToName(message.member());
178
179         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
180                 message.member().address());
181
182         resolver.addPeerAddress(memberName, message.member().address());
183     }
184
185     private void memberExited(final MemberExited message) {
186         final MemberName memberName = memberToName(message.member());
187
188         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
189                 message.member().address());
190
191         resolver.removePeerAddress(memberName);
192     }
193
194     private void memberRemoved(final MemberRemoved message) {
195         final MemberName memberName = memberToName(message.member());
196
197         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
198                 message.member().address());
199
200         resolver.removePeerAddress(memberName);
201     }
202
203     private void memberUnreachable(final UnreachableMember message) {
204         final MemberName memberName = memberToName(message.member());
205         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
206
207         resolver.removePeerAddress(memberName);
208     }
209
210     private void memberReachable(final ReachableMember message) {
211         final MemberName memberName = memberToName(message.member());
212         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
213
214         resolver.addPeerAddress(memberName, message.member().address());
215     }
216
217     private void onProducerCreated(final ProducerCreated message) {
218         LOG.debug("Received ProducerCreated: {}", message);
219
220         // fastpath if we have no peers
221         if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
222             getSender().tell(new Status.Success(null), noSender());
223         }
224
225         final ActorRef sender = getSender();
226         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
227
228         final List<CompletableFuture<Object>> futures = new ArrayList<>();
229
230         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
231             final ActorSelection actorSelection = actorSystem.actorSelection(address);
232             futures.add(
233                     FutureConverters.toJava(
234                             actorContext.executeOperationAsync(
235                                     actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
236                     .toCompletableFuture());
237         }
238
239         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
240                 futures.toArray(new CompletableFuture[futures.size()]));
241
242         combinedFuture
243                 .thenRun(() -> sender.tell(new Success(null), noSender()))
244                 .exceptionally(throwable -> {
245                     sender.tell(new Status.Failure(throwable), self());
246                     return null;
247                 });
248     }
249
250     private void onNotifyProducerCreated(final NotifyProducerCreated message) {
251         LOG.debug("Received NotifyProducerCreated: {}", message);
252
253         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
254
255         try {
256             final ActorProducerRegistration registration =
257                     new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
258             subtrees.forEach(id -> idToProducer.put(id, registration));
259             sender().tell(new Status.Success(null), self());
260         } catch (final IllegalArgumentException e) {
261             sender().tell(new Status.Failure(e), getSelf());
262         }
263     }
264
265     private void onProducerRemoved(final ProducerRemoved message) {
266         LOG.debug("Received ProducerRemoved: {}", message);
267
268         final List<CompletableFuture<Object>> futures = new ArrayList<>();
269
270         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
271             final ActorSelection selection = actorSystem.actorSelection(address);
272
273             futures.add(FutureConverters.toJava(
274                     actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
275                     .toCompletableFuture());
276         }
277
278         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
279                 futures.toArray(new CompletableFuture[futures.size()]));
280
281         final ActorRef respondTo = getSender();
282
283         combinedFuture
284                 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
285                 .exceptionally(e -> {
286                     respondTo.tell(new Status.Failure(null), self());
287                     return null;
288                 });
289
290     }
291
292     private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
293         LOG.debug("Received NotifyProducerRemoved: {}", message);
294
295         final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
296         if (registration == null) {
297             LOG.warn("The notification contained a path on which no producer is registered, throwing away");
298             getSender().tell(new Status.Success(null), noSender());
299             return;
300         }
301
302         try {
303             registration.close();
304             getSender().tell(new Status.Success(null), noSender());
305         } catch (final DOMDataTreeProducerException e) {
306             LOG.error("Unable to close producer", e);
307             getSender().tell(new Status.Failure(e), noSender());
308         }
309     }
310
311     @SuppressWarnings("checkstyle:IllegalCatch")
312     private void onLookupPrefixShard(final LookupPrefixShard message) {
313         LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
314
315         final DOMDataTreeIdentifier prefix = message.getPrefix();
316
317         final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
318                         ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
319
320         // schedule a notification task for the reply
321         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
322                 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
323                         context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
324     }
325
326     private void onPrefixShardCreated(final PrefixShardCreated message) {
327         LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
328
329         final PrefixShardConfiguration config = message.getConfiguration();
330
331         shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
332     }
333
334     private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
335         LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
336
337         final ShardRemovalLookupTask removalTask =
338                 new ShardRemovalLookupTask(actorSystem, getSender(),
339                         actorContext, message.getPrefix(), lookupTaskMaxRetries);
340
341         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
342     }
343
344     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
345         LOG.debug("Received PrefixShardRemoved: {}", message);
346
347         shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
348     }
349
350     private void onStartConfigShardLookup(final StartConfigShardLookup message) {
351         LOG.debug("Received StartConfigShardLookup: {}", message);
352
353         final ActorContext context =
354                 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
355                         ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
356
357         // schedule a notification task for the reply
358         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
359                 new ConfigShardLookupTask(
360                         actorSystem, getSender(), context, message, lookupTaskMaxRetries),
361                 actorSystem.dispatcher());
362     }
363
364     private static MemberName memberToName(final Member member) {
365         return MemberName.forName(member.roles().iterator().next());
366     }
367
368     private class ActorProducerRegistration {
369
370         private final DOMDataTreeProducer producer;
371         private final Collection<DOMDataTreeIdentifier> subtrees;
372
373         ActorProducerRegistration(final DOMDataTreeProducer producer,
374                                   final Collection<DOMDataTreeIdentifier> subtrees) {
375             this.producer = producer;
376             this.subtrees = subtrees;
377         }
378
379         void close() throws DOMDataTreeProducerException {
380             producer.close();
381             subtrees.forEach(idToProducer::remove);
382         }
383     }
384
385     private static class ShardFrontendRegistration extends
386             AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
387
388         private final ActorRef clientActor;
389         private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
390
391         ShardFrontendRegistration(final ActorRef clientActor,
392                                   final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
393             super(shardRegistration);
394             this.clientActor = clientActor;
395             this.shardRegistration = shardRegistration;
396         }
397
398         @Override
399         protected void removeRegistration() {
400             shardRegistration.close();
401             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
402         }
403     }
404
405     /**
406      * Handles the lookup step of cds shard creation once the configuration is updated.
407      */
408     private static class ShardCreationLookupTask extends LookupTask {
409
410         private final ActorSystem system;
411         private final ActorRef replyTo;
412         private final ClusterWrapper clusterWrapper;
413         private final ActorContext context;
414         private final DistributedShardedDOMDataTree shardingService;
415         private final DOMDataTreeIdentifier toLookup;
416         private final int lookupMaxRetries;
417
418         ShardCreationLookupTask(final ActorSystem system,
419                                 final ActorRef replyTo,
420                                 final ClusterWrapper clusterWrapper,
421                                 final ActorContext context,
422                                 final DistributedShardedDOMDataTree shardingService,
423                                 final DOMDataTreeIdentifier toLookup,
424                                 final int lookupMaxRetries) {
425             super(replyTo, lookupMaxRetries);
426             this.system = system;
427             this.replyTo = replyTo;
428             this.clusterWrapper = clusterWrapper;
429             this.context = context;
430             this.shardingService = shardingService;
431             this.toLookup = toLookup;
432             this.lookupMaxRetries = lookupMaxRetries;
433         }
434
435         @Override
436         public void run() {
437             final Future<ActorRef> localShardFuture =
438                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
439
440             localShardFuture.onComplete(new OnComplete<ActorRef>() {
441                 @Override
442                 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
443                     if (throwable != null) {
444                         tryReschedule(throwable);
445                     } else {
446                         LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
447
448                         system.scheduler().scheduleOnce(
449                                 SHARD_LOOKUP_TASK_INTERVAL,
450                                 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
451                                         shardingService, toLookup, lookupMaxRetries),
452                                 system.dispatcher());
453                     }
454                 }
455             }, system.dispatcher());
456         }
457
458         @Override
459         void reschedule(int retries) {
460             LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
461             system.scheduler().scheduleOnce(
462                     SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
463         }
464     }
465
466     /**
467      * Handles the readiness step by waiting for a leader of the created shard.
468      */
469     private static class ShardLeaderLookupTask extends LookupTask {
470
471         private final ActorSystem system;
472         private final ActorRef replyTo;
473         private final ActorContext context;
474         private final ClusterWrapper clusterWrapper;
475         private final ActorRef shard;
476         private final DistributedShardedDOMDataTree shardingService;
477         private final DOMDataTreeIdentifier toLookup;
478         private final int lookupMaxRetries;
479
480         ShardLeaderLookupTask(final ActorSystem system,
481                               final ActorRef replyTo,
482                               final ActorContext context,
483                               final ClusterWrapper clusterWrapper,
484                               final ActorRef shard,
485                               final DistributedShardedDOMDataTree shardingService,
486                               final DOMDataTreeIdentifier toLookup,
487                               final int lookupMaxRetries) {
488             super(replyTo, lookupMaxRetries);
489             this.system = system;
490             this.replyTo = replyTo;
491             this.context = context;
492             this.clusterWrapper = clusterWrapper;
493             this.shard = shard;
494             this.shardingService = shardingService;
495             this.toLookup = toLookup;
496             this.lookupMaxRetries = lookupMaxRetries;
497         }
498
499         @Override
500         public void run() {
501
502             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
503
504             ask.onComplete(new OnComplete<Object>() {
505                 @Override
506                 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
507                     if (throwable != null) {
508                         tryReschedule(throwable);
509                     } else {
510                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
511                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
512                         if (leaderActor.isPresent()) {
513                             // leader is found, backend seems ready, check if the frontend is ready
514                             LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
515                                     clusterWrapper.getCurrentMemberName(), toLookup);
516                             system.scheduler().scheduleOnce(
517                                     SHARD_LOOKUP_TASK_INTERVAL,
518                                     new FrontendLookupTask(
519                                             system, replyTo, shardingService, toLookup, lookupMaxRetries),
520                                     system.dispatcher());
521                         } else {
522                             tryReschedule(null);
523                         }
524                     }
525                 }
526             }, system.dispatcher());
527
528         }
529
530         @Override
531         void reschedule(int retries) {
532             LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
533                     clusterWrapper.getCurrentMemberName(), toLookup, retries);
534             system.scheduler().scheduleOnce(
535                     SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
536         }
537     }
538
539     /**
540      * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
541      * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
542      * case they race), the future for the cds shard creation is completed and the shard is ready for use.
543      */
544     private static final class FrontendLookupTask extends LookupTask {
545
546         private final ActorSystem system;
547         private final ActorRef replyTo;
548         private final DistributedShardedDOMDataTree shardingService;
549         private final DOMDataTreeIdentifier toLookup;
550
551         FrontendLookupTask(final ActorSystem system,
552                            final ActorRef replyTo,
553                            final DistributedShardedDOMDataTree shardingService,
554                            final DOMDataTreeIdentifier toLookup,
555                            final int lookupMaxRetries) {
556             super(replyTo, lookupMaxRetries);
557             this.system = system;
558             this.replyTo = replyTo;
559             this.shardingService = shardingService;
560             this.toLookup = toLookup;
561         }
562
563         @Override
564         public void run() {
565             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
566                     shardingService.lookupShardFrontend(toLookup);
567
568             if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
569                 replyTo.tell(new Success(null), noSender());
570             } else {
571                 tryReschedule(null);
572             }
573         }
574
575         private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
576                                           final DOMDataTreeIdentifier prefix) {
577             if (entry == null) {
578                 return false;
579             }
580
581             if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
582                 return true;
583             }
584
585             if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
586                 return true;
587             }
588
589             return false;
590         }
591
592         @Override
593         void reschedule(int retries) {
594             LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
595             system.scheduler().scheduleOnce(
596                     SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
597         }
598     }
599
600     /**
601      * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
602      * configuration.
603      */
604     private static class ShardRemovalLookupTask extends LookupTask {
605
606         private final ActorSystem system;
607         private final ActorRef replyTo;
608         private final ActorContext context;
609         private final DOMDataTreeIdentifier toLookup;
610
611         ShardRemovalLookupTask(final ActorSystem system,
612                                final ActorRef replyTo,
613                                final ActorContext context,
614                                final DOMDataTreeIdentifier toLookup,
615                                final int lookupMaxRetries) {
616             super(replyTo, lookupMaxRetries);
617             this.system = system;
618             this.replyTo = replyTo;
619             this.context = context;
620             this.toLookup = toLookup;
621         }
622
623         @Override
624         public void run() {
625             final Future<ActorRef> localShardFuture =
626                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
627
628             localShardFuture.onComplete(new OnComplete<ActorRef>() {
629                 @Override
630                 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
631                     if (throwable != null) {
632                         //TODO Shouldn't we check why findLocalShard failed?
633                         LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
634                                 toLookup);
635                         replyTo.tell(new Success(null), noSender());
636                     } else {
637                         tryReschedule(null);
638                     }
639                 }
640             }, system.dispatcher());
641         }
642
643         @Override
644         void reschedule(int retries) {
645             LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
646                     toLookup, retries);
647             system.scheduler().scheduleOnce(
648                     SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
649         }
650     }
651
652     /**
653      * Task for handling the lookup of the backend for the configuration shard.
654      */
655     private static class ConfigShardLookupTask extends LookupTask {
656
657         private final ActorSystem system;
658         private final ActorRef replyTo;
659         private final ActorContext context;
660
661         ConfigShardLookupTask(final ActorSystem system,
662                               final ActorRef replyTo,
663                               final ActorContext context,
664                               final StartConfigShardLookup message,
665                               final int lookupMaxRetries) {
666             super(replyTo, lookupMaxRetries);
667             this.system = system;
668             this.replyTo = replyTo;
669             this.context = context;
670         }
671
672         @Override
673         void reschedule(int retries) {
674             LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
675             system.scheduler().scheduleOnce(
676                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
677         }
678
679         @Override
680         public void run() {
681             final Optional<ActorRef> localShard =
682                     context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
683
684             if (!localShard.isPresent()) {
685                 tryReschedule(null);
686             } else {
687                 LOG.debug("Local backend for prefix configuration shard lookup successful");
688                 replyTo.tell(new Status.Success(null), noSender());
689             }
690         }
691     }
692
693     /**
694      * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
695      */
696     private static class ConfigShardReadinessTask extends LookupTask {
697
698         private final ActorSystem system;
699         private final ActorRef replyTo;
700         private final ActorContext context;
701         private final ClusterWrapper clusterWrapper;
702         private final ActorRef shard;
703
704         ConfigShardReadinessTask(final ActorSystem system,
705                                  final ActorRef replyTo,
706                                  final ActorContext context,
707                                  final ClusterWrapper clusterWrapper,
708                                  final ActorRef shard,
709                                  final int lookupMaxRetries) {
710             super(replyTo, lookupMaxRetries);
711             this.system = system;
712             this.replyTo = replyTo;
713             this.context = context;
714             this.clusterWrapper = clusterWrapper;
715             this.shard = shard;
716         }
717
718         @Override
719         void reschedule(int retries) {
720             LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
721                     clusterWrapper.getCurrentMemberName(), retries);
722             system.scheduler().scheduleOnce(
723                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
724         }
725
726         @Override
727         public void run() {
728             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
729
730             ask.onComplete(new OnComplete<Object>() {
731                 @Override
732                 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
733                     if (throwable != null) {
734                         tryReschedule(throwable);
735                     } else {
736                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
737                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
738                         if (leaderActor.isPresent()) {
739                             // leader is found, backend seems ready, check if the frontend is ready
740                             LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
741                                     clusterWrapper.getCurrentMemberName());
742                             replyTo.tell(new Status.Success(null), noSender());
743                         } else {
744                             tryReschedule(null);
745                         }
746                     }
747                 }
748             }, system.dispatcher());
749         }
750     }
751
752     public static class ShardedDataTreeActorCreator {
753
754         private DistributedShardedDOMDataTree shardingService;
755         private AbstractDataStore distributedConfigDatastore;
756         private AbstractDataStore distributedOperDatastore;
757         private ActorSystem actorSystem;
758         private ClusterWrapper cluster;
759         private int maxRetries;
760
761         public DistributedShardedDOMDataTree getShardingService() {
762             return shardingService;
763         }
764
765         public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
766             this.shardingService = shardingService;
767             return this;
768         }
769
770         public ActorSystem getActorSystem() {
771             return actorSystem;
772         }
773
774         public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
775             this.actorSystem = actorSystem;
776             return this;
777         }
778
779         public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
780             this.cluster = cluster;
781             return this;
782         }
783
784         public ClusterWrapper getClusterWrapper() {
785             return cluster;
786         }
787
788         public AbstractDataStore getDistributedConfigDatastore() {
789             return distributedConfigDatastore;
790         }
791
792         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
793                 final AbstractDataStore distributedConfigDatastore) {
794             this.distributedConfigDatastore = distributedConfigDatastore;
795             return this;
796         }
797
798         public AbstractDataStore getDistributedOperDatastore() {
799             return distributedOperDatastore;
800         }
801
802         public ShardedDataTreeActorCreator setDistributedOperDatastore(
803                 final AbstractDataStore distributedOperDatastore) {
804             this.distributedOperDatastore = distributedOperDatastore;
805             return this;
806         }
807
808         public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) {
809             this.maxRetries = maxRetries;
810             return this;
811         }
812
813         public int getLookupTaskMaxRetries() {
814             return maxRetries;
815         }
816
817         private void verify() {
818             Preconditions.checkNotNull(shardingService);
819             Preconditions.checkNotNull(actorSystem);
820             Preconditions.checkNotNull(cluster);
821             Preconditions.checkNotNull(distributedConfigDatastore);
822             Preconditions.checkNotNull(distributedOperDatastore);
823         }
824
825         public Props props() {
826             verify();
827             return Props.create(ShardedDataTreeActor.class, this);
828         }
829     }
830 }