5ab402ce59671c60dc3341839563606dd8a52fe2
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.sharding;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.ActorSystem;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.actor.Status;
16 import akka.actor.Status.Success;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.ClusterEvent.MemberExited;
19 import akka.cluster.ClusterEvent.MemberRemoved;
20 import akka.cluster.ClusterEvent.MemberUp;
21 import akka.cluster.ClusterEvent.MemberWeaklyUp;
22 import akka.cluster.ClusterEvent.ReachableMember;
23 import akka.cluster.ClusterEvent.UnreachableMember;
24 import akka.cluster.Member;
25 import akka.dispatch.OnComplete;
26 import akka.pattern.Patterns;
27 import akka.util.Timeout;
28 import com.google.common.base.Preconditions;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Optional;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.TimeUnit;
38 import org.opendaylight.controller.cluster.access.concepts.MemberName;
39 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
40 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
41 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
42 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
43 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
44 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
45 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
46 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
47 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
48 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
49 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
50 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
51 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
52 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
53 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
54 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
55 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
56 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
61 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
62 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
63 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68 import scala.compat.java8.FutureConverters;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71
72 /**
73  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
74  * nodes of newly open producers/shards on the local node.
75  */
76 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
77
78     private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
79
80     private static final String PERSISTENCE_ID = "sharding-service-actor";
81     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
82
83     static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
84
85     private final DistributedShardedDOMDataTree shardingService;
86     private final ActorSystem actorSystem;
87     private final ClusterWrapper clusterWrapper;
88     // helper actorContext used only for static calls to executeAsync etc
89     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
90     private final ActorUtils actorUtils;
91     private final ShardingServiceAddressResolver resolver;
92     private final AbstractDataStore distributedConfigDatastore;
93     private final AbstractDataStore distributedOperDatastore;
94     private final int lookupTaskMaxRetries;
95
96     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
97
98     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
99         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
100
101         shardingService = builder.getShardingService();
102         actorSystem = builder.getActorSystem();
103         clusterWrapper = builder.getClusterWrapper();
104         distributedConfigDatastore = builder.getDistributedConfigDatastore();
105         distributedOperDatastore = builder.getDistributedOperDatastore();
106         lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
107         actorUtils = distributedConfigDatastore.getActorUtils();
108         resolver = new ShardingServiceAddressResolver(
109                 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
110
111         clusterWrapper.subscribeToMemberEvents(self());
112     }
113
114     @Override
115     public void preStart() {
116     }
117
118     @Override
119     protected void handleRecover(final Object message) {
120         LOG.debug("Received a recover message {}", message);
121     }
122
123     @Override
124     protected void handleCommand(final Object message) {
125         LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
126         if (message instanceof ClusterEvent.MemberUp) {
127             memberUp((ClusterEvent.MemberUp) message);
128         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
129             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
130         } else if (message instanceof ClusterEvent.MemberExited) {
131             memberExited((ClusterEvent.MemberExited) message);
132         } else if (message instanceof ClusterEvent.MemberRemoved) {
133             memberRemoved((ClusterEvent.MemberRemoved) message);
134         } else if (message instanceof ClusterEvent.UnreachableMember) {
135             memberUnreachable((ClusterEvent.UnreachableMember) message);
136         } else if (message instanceof ClusterEvent.ReachableMember) {
137             memberReachable((ClusterEvent.ReachableMember) message);
138         } else if (message instanceof ProducerCreated) {
139             onProducerCreated((ProducerCreated) message);
140         } else if (message instanceof NotifyProducerCreated) {
141             onNotifyProducerCreated((NotifyProducerCreated) message);
142         } else if (message instanceof ProducerRemoved) {
143             onProducerRemoved((ProducerRemoved) message);
144         } else if (message instanceof NotifyProducerRemoved) {
145             onNotifyProducerRemoved((NotifyProducerRemoved) message);
146         } else if (message instanceof PrefixShardCreated) {
147             onPrefixShardCreated((PrefixShardCreated) message);
148         } else if (message instanceof LookupPrefixShard) {
149             onLookupPrefixShard((LookupPrefixShard) message);
150         } else if (message instanceof PrefixShardRemovalLookup) {
151             onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
152         } else if (message instanceof PrefixShardRemoved) {
153             onPrefixShardRemoved((PrefixShardRemoved) message);
154         } else if (message instanceof StartConfigShardLookup) {
155             onStartConfigShardLookup((StartConfigShardLookup) message);
156         }
157     }
158
159     @Override
160     public String persistenceId() {
161         return PERSISTENCE_ID;
162     }
163
164     private void memberUp(final MemberUp message) {
165         final MemberName memberName = memberToName(message.member());
166
167         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
168                 message.member().address());
169
170         resolver.addPeerAddress(memberName, message.member().address());
171     }
172
173     private void memberWeaklyUp(final MemberWeaklyUp message) {
174         final MemberName memberName = memberToName(message.member());
175
176         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
177                 message.member().address());
178
179         resolver.addPeerAddress(memberName, message.member().address());
180     }
181
182     private void memberExited(final MemberExited message) {
183         final MemberName memberName = memberToName(message.member());
184
185         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
186                 message.member().address());
187
188         resolver.removePeerAddress(memberName);
189     }
190
191     private void memberRemoved(final MemberRemoved message) {
192         final MemberName memberName = memberToName(message.member());
193
194         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
195                 message.member().address());
196
197         resolver.removePeerAddress(memberName);
198     }
199
200     private void memberUnreachable(final UnreachableMember message) {
201         final MemberName memberName = memberToName(message.member());
202         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
203
204         resolver.removePeerAddress(memberName);
205     }
206
207     private void memberReachable(final ReachableMember message) {
208         final MemberName memberName = memberToName(message.member());
209         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
210
211         resolver.addPeerAddress(memberName, message.member().address());
212     }
213
214     private void onProducerCreated(final ProducerCreated message) {
215         LOG.debug("Received ProducerCreated: {}", message);
216
217         // fastpath if we have no peers
218         if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
219             getSender().tell(new Status.Success(null), ActorRef.noSender());
220         }
221
222         final ActorRef sender = getSender();
223         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
224
225         final List<CompletableFuture<Object>> futures = new ArrayList<>();
226
227         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
228             final ActorSelection actorSelection = actorSystem.actorSelection(address);
229             futures.add(
230                     FutureConverters.toJava(
231                             actorUtils.executeOperationAsync(
232                                     actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
233                     .toCompletableFuture());
234         }
235
236         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
237                 futures.toArray(new CompletableFuture[futures.size()]));
238
239         combinedFuture
240                 .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender()))
241                 .exceptionally(throwable -> {
242                     sender.tell(new Status.Failure(throwable), self());
243                     return null;
244                 });
245     }
246
247     private void onNotifyProducerCreated(final NotifyProducerCreated message) {
248         LOG.debug("Received NotifyProducerCreated: {}", message);
249
250         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
251
252         try {
253             final ActorProducerRegistration registration =
254                     new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
255             subtrees.forEach(id -> idToProducer.put(id, registration));
256             sender().tell(new Status.Success(null), self());
257         } catch (final IllegalArgumentException e) {
258             sender().tell(new Status.Failure(e), getSelf());
259         }
260     }
261
262     private void onProducerRemoved(final ProducerRemoved message) {
263         LOG.debug("Received ProducerRemoved: {}", message);
264
265         final List<CompletableFuture<Object>> futures = new ArrayList<>();
266
267         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
268             final ActorSelection selection = actorSystem.actorSelection(address);
269
270             futures.add(FutureConverters.toJava(
271                     actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
272                     .toCompletableFuture());
273         }
274
275         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
276                 futures.toArray(new CompletableFuture[futures.size()]));
277
278         final ActorRef respondTo = getSender();
279
280         combinedFuture
281                 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
282                 .exceptionally(e -> {
283                     respondTo.tell(new Status.Failure(null), self());
284                     return null;
285                 });
286
287     }
288
289     private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
290         LOG.debug("Received NotifyProducerRemoved: {}", message);
291
292         final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
293         if (registration == null) {
294             LOG.warn("The notification contained a path on which no producer is registered, throwing away");
295             getSender().tell(new Status.Success(null), ActorRef.noSender());
296             return;
297         }
298
299         try {
300             registration.close();
301             getSender().tell(new Status.Success(null), ActorRef.noSender());
302         } catch (final DOMDataTreeProducerException e) {
303             LOG.error("Unable to close producer", e);
304             getSender().tell(new Status.Failure(e), ActorRef.noSender());
305         }
306     }
307
308     @SuppressWarnings("checkstyle:IllegalCatch")
309     private void onLookupPrefixShard(final LookupPrefixShard message) {
310         LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
311
312         final DOMDataTreeIdentifier prefix = message.getPrefix();
313
314         final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
315                         ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
316
317         // schedule a notification task for the reply
318         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
319                 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
320                         utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
321     }
322
323     private void onPrefixShardCreated(final PrefixShardCreated message) {
324         LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
325
326         final PrefixShardConfiguration config = message.getConfiguration();
327
328         shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
329     }
330
331     private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
332         LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
333
334         final ShardRemovalLookupTask removalTask =
335                 new ShardRemovalLookupTask(actorSystem, getSender(),
336                         actorUtils, message.getPrefix(), lookupTaskMaxRetries);
337
338         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
339     }
340
341     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
342         LOG.debug("Received PrefixShardRemoved: {}", message);
343
344         shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
345     }
346
347     private void onStartConfigShardLookup(final StartConfigShardLookup message) {
348         LOG.debug("Received StartConfigShardLookup: {}", message);
349
350         final ActorUtils context =
351                 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
352                         ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
353
354         // schedule a notification task for the reply
355         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
356                 new ConfigShardLookupTask(
357                         actorSystem, getSender(), context, message, lookupTaskMaxRetries),
358                 actorSystem.dispatcher());
359     }
360
361     private static MemberName memberToName(final Member member) {
362         return MemberName.forName(member.roles().iterator().next());
363     }
364
365     private class ActorProducerRegistration {
366
367         private final DOMDataTreeProducer producer;
368         private final Collection<DOMDataTreeIdentifier> subtrees;
369
370         ActorProducerRegistration(final DOMDataTreeProducer producer,
371                                   final Collection<DOMDataTreeIdentifier> subtrees) {
372             this.producer = producer;
373             this.subtrees = subtrees;
374         }
375
376         void close() throws DOMDataTreeProducerException {
377             producer.close();
378             subtrees.forEach(idToProducer::remove);
379         }
380     }
381
382     private static class ShardFrontendRegistration extends
383             AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
384
385         private final ActorRef clientActor;
386         private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
387
388         ShardFrontendRegistration(final ActorRef clientActor,
389                                   final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
390             super(shardRegistration);
391             this.clientActor = clientActor;
392             this.shardRegistration = shardRegistration;
393         }
394
395         @Override
396         protected void removeRegistration() {
397             shardRegistration.close();
398             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
399         }
400     }
401
402     /**
403      * Handles the lookup step of cds shard creation once the configuration is updated.
404      */
405     private static class ShardCreationLookupTask extends LookupTask {
406
407         private final ActorSystem system;
408         private final ActorRef replyTo;
409         private final ClusterWrapper clusterWrapper;
410         private final ActorUtils context;
411         private final DistributedShardedDOMDataTree shardingService;
412         private final DOMDataTreeIdentifier toLookup;
413         private final int lookupMaxRetries;
414
415         ShardCreationLookupTask(final ActorSystem system,
416                                 final ActorRef replyTo,
417                                 final ClusterWrapper clusterWrapper,
418                                 final ActorUtils context,
419                                 final DistributedShardedDOMDataTree shardingService,
420                                 final DOMDataTreeIdentifier toLookup,
421                                 final int lookupMaxRetries) {
422             super(replyTo, lookupMaxRetries);
423             this.system = system;
424             this.replyTo = replyTo;
425             this.clusterWrapper = clusterWrapper;
426             this.context = context;
427             this.shardingService = shardingService;
428             this.toLookup = toLookup;
429             this.lookupMaxRetries = lookupMaxRetries;
430         }
431
432         @Override
433         public void run() {
434             final Future<ActorRef> localShardFuture =
435                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
436
437             localShardFuture.onComplete(new OnComplete<ActorRef>() {
438                 @Override
439                 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
440                     if (throwable != null) {
441                         tryReschedule(throwable);
442                     } else {
443                         LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
444
445                         system.scheduler().scheduleOnce(
446                                 SHARD_LOOKUP_TASK_INTERVAL,
447                                 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
448                                         shardingService, toLookup, lookupMaxRetries),
449                                 system.dispatcher());
450                     }
451                 }
452             }, system.dispatcher());
453         }
454
455         @Override
456         void reschedule(final int retries) {
457             LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
458             system.scheduler().scheduleOnce(
459                     SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
460         }
461     }
462
463     /**
464      * Handles the readiness step by waiting for a leader of the created shard.
465      */
466     private static class ShardLeaderLookupTask extends LookupTask {
467
468         private final ActorSystem system;
469         private final ActorRef replyTo;
470         private final ActorUtils context;
471         private final ClusterWrapper clusterWrapper;
472         private final ActorRef shard;
473         private final DistributedShardedDOMDataTree shardingService;
474         private final DOMDataTreeIdentifier toLookup;
475         private final int lookupMaxRetries;
476
477         ShardLeaderLookupTask(final ActorSystem system,
478                               final ActorRef replyTo,
479                               final ActorUtils context,
480                               final ClusterWrapper clusterWrapper,
481                               final ActorRef shard,
482                               final DistributedShardedDOMDataTree shardingService,
483                               final DOMDataTreeIdentifier toLookup,
484                               final int lookupMaxRetries) {
485             super(replyTo, lookupMaxRetries);
486             this.system = system;
487             this.replyTo = replyTo;
488             this.context = context;
489             this.clusterWrapper = clusterWrapper;
490             this.shard = shard;
491             this.shardingService = shardingService;
492             this.toLookup = toLookup;
493             this.lookupMaxRetries = lookupMaxRetries;
494         }
495
496         @Override
497         public void run() {
498
499             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
500
501             ask.onComplete(new OnComplete<Object>() {
502                 @Override
503                 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
504                     if (throwable != null) {
505                         tryReschedule(throwable);
506                     } else {
507                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
508                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
509                         if (leaderActor.isPresent()) {
510                             // leader is found, backend seems ready, check if the frontend is ready
511                             LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
512                                     clusterWrapper.getCurrentMemberName(), toLookup);
513                             system.scheduler().scheduleOnce(
514                                     SHARD_LOOKUP_TASK_INTERVAL,
515                                     new FrontendLookupTask(
516                                             system, replyTo, shardingService, toLookup, lookupMaxRetries),
517                                     system.dispatcher());
518                         } else {
519                             tryReschedule(null);
520                         }
521                     }
522                 }
523             }, system.dispatcher());
524
525         }
526
527         @Override
528         void reschedule(final int retries) {
529             LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
530                     clusterWrapper.getCurrentMemberName(), toLookup, retries);
531             system.scheduler().scheduleOnce(
532                     SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
533         }
534     }
535
536     /**
537      * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
538      * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
539      * case they race), the future for the cds shard creation is completed and the shard is ready for use.
540      */
541     private static final class FrontendLookupTask extends LookupTask {
542
543         private final ActorSystem system;
544         private final ActorRef replyTo;
545         private final DistributedShardedDOMDataTree shardingService;
546         private final DOMDataTreeIdentifier toLookup;
547
548         FrontendLookupTask(final ActorSystem system,
549                            final ActorRef replyTo,
550                            final DistributedShardedDOMDataTree shardingService,
551                            final DOMDataTreeIdentifier toLookup,
552                            final int lookupMaxRetries) {
553             super(replyTo, lookupMaxRetries);
554             this.system = system;
555             this.replyTo = replyTo;
556             this.shardingService = shardingService;
557             this.toLookup = toLookup;
558         }
559
560         @Override
561         public void run() {
562             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
563                     shardingService.lookupShardFrontend(toLookup);
564
565             if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
566                 replyTo.tell(new Success(null), ActorRef.noSender());
567             } else {
568                 tryReschedule(null);
569             }
570         }
571
572         private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
573                                           final DOMDataTreeIdentifier prefix) {
574             if (entry == null) {
575                 return false;
576             }
577
578             if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
579                 return true;
580             }
581
582             if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
583                 return true;
584             }
585
586             return false;
587         }
588
589         @Override
590         void reschedule(final int retries) {
591             LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
592             system.scheduler().scheduleOnce(
593                     SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
594         }
595     }
596
597     /**
598      * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
599      * configuration.
600      */
601     private static class ShardRemovalLookupTask extends LookupTask {
602
603         private final ActorSystem system;
604         private final ActorRef replyTo;
605         private final ActorUtils context;
606         private final DOMDataTreeIdentifier toLookup;
607
608         ShardRemovalLookupTask(final ActorSystem system,
609                                final ActorRef replyTo,
610                                final ActorUtils context,
611                                final DOMDataTreeIdentifier toLookup,
612                                final int lookupMaxRetries) {
613             super(replyTo, lookupMaxRetries);
614             this.system = system;
615             this.replyTo = replyTo;
616             this.context = context;
617             this.toLookup = toLookup;
618         }
619
620         @Override
621         public void run() {
622             final Future<ActorRef> localShardFuture =
623                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
624
625             localShardFuture.onComplete(new OnComplete<ActorRef>() {
626                 @Override
627                 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
628                     if (throwable != null) {
629                         //TODO Shouldn't we check why findLocalShard failed?
630                         LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
631                                 toLookup);
632                         replyTo.tell(new Success(null), ActorRef.noSender());
633                     } else {
634                         tryReschedule(null);
635                     }
636                 }
637             }, system.dispatcher());
638         }
639
640         @Override
641         void reschedule(final int retries) {
642             LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
643                     toLookup, retries);
644             system.scheduler().scheduleOnce(
645                     SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
646         }
647     }
648
649     /**
650      * Task for handling the lookup of the backend for the configuration shard.
651      */
652     private static class ConfigShardLookupTask extends LookupTask {
653
654         private final ActorSystem system;
655         private final ActorRef replyTo;
656         private final ActorUtils context;
657
658         ConfigShardLookupTask(final ActorSystem system,
659                               final ActorRef replyTo,
660                               final ActorUtils context,
661                               final StartConfigShardLookup message,
662                               final int lookupMaxRetries) {
663             super(replyTo, lookupMaxRetries);
664             this.system = system;
665             this.replyTo = replyTo;
666             this.context = context;
667         }
668
669         @Override
670         void reschedule(final int retries) {
671             LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
672             system.scheduler().scheduleOnce(
673                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
674         }
675
676         @Override
677         public void run() {
678             final Optional<ActorRef> localShard =
679                     context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
680
681             if (!localShard.isPresent()) {
682                 tryReschedule(null);
683             } else {
684                 LOG.debug("Local backend for prefix configuration shard lookup successful");
685                 replyTo.tell(new Status.Success(null), ActorRef.noSender());
686             }
687         }
688     }
689
690     /**
691      * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
692      */
693     private static class ConfigShardReadinessTask extends LookupTask {
694
695         private final ActorSystem system;
696         private final ActorRef replyTo;
697         private final ActorUtils context;
698         private final ClusterWrapper clusterWrapper;
699         private final ActorRef shard;
700
701         ConfigShardReadinessTask(final ActorSystem system,
702                                  final ActorRef replyTo,
703                                  final ActorUtils context,
704                                  final ClusterWrapper clusterWrapper,
705                                  final ActorRef shard,
706                                  final int lookupMaxRetries) {
707             super(replyTo, lookupMaxRetries);
708             this.system = system;
709             this.replyTo = replyTo;
710             this.context = context;
711             this.clusterWrapper = clusterWrapper;
712             this.shard = shard;
713         }
714
715         @Override
716         void reschedule(final int retries) {
717             LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
718                     clusterWrapper.getCurrentMemberName(), retries);
719             system.scheduler().scheduleOnce(
720                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
721         }
722
723         @Override
724         public void run() {
725             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
726
727             ask.onComplete(new OnComplete<Object>() {
728                 @Override
729                 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
730                     if (throwable != null) {
731                         tryReschedule(throwable);
732                     } else {
733                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
734                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
735                         if (leaderActor.isPresent()) {
736                             // leader is found, backend seems ready, check if the frontend is ready
737                             LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
738                                     clusterWrapper.getCurrentMemberName());
739                             replyTo.tell(new Status.Success(null), ActorRef.noSender());
740                         } else {
741                             tryReschedule(null);
742                         }
743                     }
744                 }
745             }, system.dispatcher());
746         }
747     }
748
749     public static class ShardedDataTreeActorCreator {
750
751         private DistributedShardedDOMDataTree shardingService;
752         private AbstractDataStore distributedConfigDatastore;
753         private AbstractDataStore distributedOperDatastore;
754         private ActorSystem actorSystem;
755         private ClusterWrapper cluster;
756         private int maxRetries;
757
758         public DistributedShardedDOMDataTree getShardingService() {
759             return shardingService;
760         }
761
762         public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
763             this.shardingService = shardingService;
764             return this;
765         }
766
767         public ActorSystem getActorSystem() {
768             return actorSystem;
769         }
770
771         public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
772             this.actorSystem = actorSystem;
773             return this;
774         }
775
776         public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
777             this.cluster = clusterWrapper;
778             return this;
779         }
780
781         public ClusterWrapper getClusterWrapper() {
782             return cluster;
783         }
784
785         public AbstractDataStore getDistributedConfigDatastore() {
786             return distributedConfigDatastore;
787         }
788
789         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
790                 final AbstractDataStore distributedConfigDatastore) {
791             this.distributedConfigDatastore = distributedConfigDatastore;
792             return this;
793         }
794
795         public AbstractDataStore getDistributedOperDatastore() {
796             return distributedOperDatastore;
797         }
798
799         public ShardedDataTreeActorCreator setDistributedOperDatastore(
800                 final AbstractDataStore distributedOperDatastore) {
801             this.distributedOperDatastore = distributedOperDatastore;
802             return this;
803         }
804
805         public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
806             this.maxRetries = newMaxRetries;
807             return this;
808         }
809
810         public int getLookupTaskMaxRetries() {
811             return maxRetries;
812         }
813
814         private void verify() {
815             Preconditions.checkNotNull(shardingService);
816             Preconditions.checkNotNull(actorSystem);
817             Preconditions.checkNotNull(cluster);
818             Preconditions.checkNotNull(distributedConfigDatastore);
819             Preconditions.checkNotNull(distributedOperDatastore);
820         }
821
822         public Props props() {
823             verify();
824             return Props.create(ShardedDataTreeActor.class, this);
825         }
826     }
827 }