Rename ActorContext to ActorUtils
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.actor.Status;
17 import akka.actor.Status.Success;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.ClusterEvent.MemberExited;
20 import akka.cluster.ClusterEvent.MemberRemoved;
21 import akka.cluster.ClusterEvent.MemberUp;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.ClusterEvent.ReachableMember;
24 import akka.cluster.ClusterEvent.UnreachableMember;
25 import akka.cluster.Member;
26 import akka.dispatch.OnComplete;
27 import akka.pattern.Patterns;
28 import akka.util.Timeout;
29 import com.google.common.base.Optional;
30 import com.google.common.base.Preconditions;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.TimeUnit;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
41 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
42 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
43 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
44 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
47 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
48 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
49 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
50 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
51 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
52 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
53 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
54 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
55 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
56 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
57 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
62 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
63 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
64 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.compat.java8.FutureConverters;
70 import scala.concurrent.Future;
71 import scala.concurrent.duration.FiniteDuration;
72
73 /**
74  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
75  * nodes of newly open producers/shards on the local node.
76  */
77 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
78
79     private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
80
81     private static final String PERSISTENCE_ID = "sharding-service-actor";
82     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
83
84     static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
85
86     private final DistributedShardedDOMDataTree shardingService;
87     private final ActorSystem actorSystem;
88     private final ClusterWrapper clusterWrapper;
89     // helper actorContext used only for static calls to executeAsync etc
90     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
91     private final ActorUtils actorUtils;
92     private final ShardingServiceAddressResolver resolver;
93     private final AbstractDataStore distributedConfigDatastore;
94     private final AbstractDataStore distributedOperDatastore;
95     private final int lookupTaskMaxRetries;
96
97     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
98
99     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
100         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
101
102         shardingService = builder.getShardingService();
103         actorSystem = builder.getActorSystem();
104         clusterWrapper = builder.getClusterWrapper();
105         distributedConfigDatastore = builder.getDistributedConfigDatastore();
106         distributedOperDatastore = builder.getDistributedOperDatastore();
107         lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
108         actorUtils = distributedConfigDatastore.getActorUtils();
109         resolver = new ShardingServiceAddressResolver(
110                 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
111
112         clusterWrapper.subscribeToMemberEvents(self());
113     }
114
115     @Override
116     public void preStart() {
117     }
118
119     @Override
120     protected void handleRecover(final Object message) {
121         LOG.debug("Received a recover message {}", message);
122     }
123
124     @Override
125     protected void handleCommand(final Object message) {
126         LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
127         if (message instanceof ClusterEvent.MemberUp) {
128             memberUp((ClusterEvent.MemberUp) message);
129         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
130             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
131         } else if (message instanceof ClusterEvent.MemberExited) {
132             memberExited((ClusterEvent.MemberExited) message);
133         } else if (message instanceof ClusterEvent.MemberRemoved) {
134             memberRemoved((ClusterEvent.MemberRemoved) message);
135         } else if (message instanceof ClusterEvent.UnreachableMember) {
136             memberUnreachable((ClusterEvent.UnreachableMember) message);
137         } else if (message instanceof ClusterEvent.ReachableMember) {
138             memberReachable((ClusterEvent.ReachableMember) message);
139         } else if (message instanceof ProducerCreated) {
140             onProducerCreated((ProducerCreated) message);
141         } else if (message instanceof NotifyProducerCreated) {
142             onNotifyProducerCreated((NotifyProducerCreated) message);
143         } else if (message instanceof ProducerRemoved) {
144             onProducerRemoved((ProducerRemoved) message);
145         } else if (message instanceof NotifyProducerRemoved) {
146             onNotifyProducerRemoved((NotifyProducerRemoved) message);
147         } else if (message instanceof PrefixShardCreated) {
148             onPrefixShardCreated((PrefixShardCreated) message);
149         } else if (message instanceof LookupPrefixShard) {
150             onLookupPrefixShard((LookupPrefixShard) message);
151         } else if (message instanceof PrefixShardRemovalLookup) {
152             onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
153         } else if (message instanceof PrefixShardRemoved) {
154             onPrefixShardRemoved((PrefixShardRemoved) message);
155         } else if (message instanceof StartConfigShardLookup) {
156             onStartConfigShardLookup((StartConfigShardLookup) message);
157         }
158     }
159
160     @Override
161     public String persistenceId() {
162         return PERSISTENCE_ID;
163     }
164
165     private void memberUp(final MemberUp message) {
166         final MemberName memberName = memberToName(message.member());
167
168         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
169                 message.member().address());
170
171         resolver.addPeerAddress(memberName, message.member().address());
172     }
173
174     private void memberWeaklyUp(final MemberWeaklyUp message) {
175         final MemberName memberName = memberToName(message.member());
176
177         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
178                 message.member().address());
179
180         resolver.addPeerAddress(memberName, message.member().address());
181     }
182
183     private void memberExited(final MemberExited message) {
184         final MemberName memberName = memberToName(message.member());
185
186         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
187                 message.member().address());
188
189         resolver.removePeerAddress(memberName);
190     }
191
192     private void memberRemoved(final MemberRemoved message) {
193         final MemberName memberName = memberToName(message.member());
194
195         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
196                 message.member().address());
197
198         resolver.removePeerAddress(memberName);
199     }
200
201     private void memberUnreachable(final UnreachableMember message) {
202         final MemberName memberName = memberToName(message.member());
203         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
204
205         resolver.removePeerAddress(memberName);
206     }
207
208     private void memberReachable(final ReachableMember message) {
209         final MemberName memberName = memberToName(message.member());
210         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
211
212         resolver.addPeerAddress(memberName, message.member().address());
213     }
214
215     private void onProducerCreated(final ProducerCreated message) {
216         LOG.debug("Received ProducerCreated: {}", message);
217
218         // fastpath if we have no peers
219         if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
220             getSender().tell(new Status.Success(null), ActorRef.noSender());
221         }
222
223         final ActorRef sender = getSender();
224         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
225
226         final List<CompletableFuture<Object>> futures = new ArrayList<>();
227
228         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
229             final ActorSelection actorSelection = actorSystem.actorSelection(address);
230             futures.add(
231                     FutureConverters.toJava(
232                             actorUtils.executeOperationAsync(
233                                     actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
234                     .toCompletableFuture());
235         }
236
237         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
238                 futures.toArray(new CompletableFuture[futures.size()]));
239
240         combinedFuture
241                 .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender()))
242                 .exceptionally(throwable -> {
243                     sender.tell(new Status.Failure(throwable), self());
244                     return null;
245                 });
246     }
247
248     private void onNotifyProducerCreated(final NotifyProducerCreated message) {
249         LOG.debug("Received NotifyProducerCreated: {}", message);
250
251         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
252
253         try {
254             final ActorProducerRegistration registration =
255                     new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
256             subtrees.forEach(id -> idToProducer.put(id, registration));
257             sender().tell(new Status.Success(null), self());
258         } catch (final IllegalArgumentException e) {
259             sender().tell(new Status.Failure(e), getSelf());
260         }
261     }
262
263     private void onProducerRemoved(final ProducerRemoved message) {
264         LOG.debug("Received ProducerRemoved: {}", message);
265
266         final List<CompletableFuture<Object>> futures = new ArrayList<>();
267
268         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
269             final ActorSelection selection = actorSystem.actorSelection(address);
270
271             futures.add(FutureConverters.toJava(
272                     actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
273                     .toCompletableFuture());
274         }
275
276         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
277                 futures.toArray(new CompletableFuture[futures.size()]));
278
279         final ActorRef respondTo = getSender();
280
281         combinedFuture
282                 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
283                 .exceptionally(e -> {
284                     respondTo.tell(new Status.Failure(null), self());
285                     return null;
286                 });
287
288     }
289
290     private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
291         LOG.debug("Received NotifyProducerRemoved: {}", message);
292
293         final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
294         if (registration == null) {
295             LOG.warn("The notification contained a path on which no producer is registered, throwing away");
296             getSender().tell(new Status.Success(null), ActorRef.noSender());
297             return;
298         }
299
300         try {
301             registration.close();
302             getSender().tell(new Status.Success(null), ActorRef.noSender());
303         } catch (final DOMDataTreeProducerException e) {
304             LOG.error("Unable to close producer", e);
305             getSender().tell(new Status.Failure(e), ActorRef.noSender());
306         }
307     }
308
309     @SuppressWarnings("checkstyle:IllegalCatch")
310     private void onLookupPrefixShard(final LookupPrefixShard message) {
311         LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
312
313         final DOMDataTreeIdentifier prefix = message.getPrefix();
314
315         final ActorUtils context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
316                         ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
317
318         // schedule a notification task for the reply
319         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
320                 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
321                         context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
322     }
323
324     private void onPrefixShardCreated(final PrefixShardCreated message) {
325         LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
326
327         final PrefixShardConfiguration config = message.getConfiguration();
328
329         shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
330     }
331
332     private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
333         LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
334
335         final ShardRemovalLookupTask removalTask =
336                 new ShardRemovalLookupTask(actorSystem, getSender(),
337                         actorUtils, message.getPrefix(), lookupTaskMaxRetries);
338
339         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
340     }
341
342     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
343         LOG.debug("Received PrefixShardRemoved: {}", message);
344
345         shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
346     }
347
348     private void onStartConfigShardLookup(final StartConfigShardLookup message) {
349         LOG.debug("Received StartConfigShardLookup: {}", message);
350
351         final ActorUtils context =
352                 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
353                         ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
354
355         // schedule a notification task for the reply
356         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
357                 new ConfigShardLookupTask(
358                         actorSystem, getSender(), context, message, lookupTaskMaxRetries),
359                 actorSystem.dispatcher());
360     }
361
362     private static MemberName memberToName(final Member member) {
363         return MemberName.forName(member.roles().iterator().next());
364     }
365
366     private class ActorProducerRegistration {
367
368         private final DOMDataTreeProducer producer;
369         private final Collection<DOMDataTreeIdentifier> subtrees;
370
371         ActorProducerRegistration(final DOMDataTreeProducer producer,
372                                   final Collection<DOMDataTreeIdentifier> subtrees) {
373             this.producer = producer;
374             this.subtrees = subtrees;
375         }
376
377         void close() throws DOMDataTreeProducerException {
378             producer.close();
379             subtrees.forEach(idToProducer::remove);
380         }
381     }
382
383     private static class ShardFrontendRegistration extends
384             AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
385
386         private final ActorRef clientActor;
387         private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
388
389         ShardFrontendRegistration(final ActorRef clientActor,
390                                   final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
391             super(shardRegistration);
392             this.clientActor = clientActor;
393             this.shardRegistration = shardRegistration;
394         }
395
396         @Override
397         protected void removeRegistration() {
398             shardRegistration.close();
399             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
400         }
401     }
402
403     /**
404      * Handles the lookup step of cds shard creation once the configuration is updated.
405      */
406     private static class ShardCreationLookupTask extends LookupTask {
407
408         private final ActorSystem system;
409         private final ActorRef replyTo;
410         private final ClusterWrapper clusterWrapper;
411         private final ActorUtils context;
412         private final DistributedShardedDOMDataTree shardingService;
413         private final DOMDataTreeIdentifier toLookup;
414         private final int lookupMaxRetries;
415
416         ShardCreationLookupTask(final ActorSystem system,
417                                 final ActorRef replyTo,
418                                 final ClusterWrapper clusterWrapper,
419                                 final ActorUtils context,
420                                 final DistributedShardedDOMDataTree shardingService,
421                                 final DOMDataTreeIdentifier toLookup,
422                                 final int lookupMaxRetries) {
423             super(replyTo, lookupMaxRetries);
424             this.system = system;
425             this.replyTo = replyTo;
426             this.clusterWrapper = clusterWrapper;
427             this.context = context;
428             this.shardingService = shardingService;
429             this.toLookup = toLookup;
430             this.lookupMaxRetries = lookupMaxRetries;
431         }
432
433         @Override
434         public void run() {
435             final Future<ActorRef> localShardFuture =
436                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
437
438             localShardFuture.onComplete(new OnComplete<ActorRef>() {
439                 @Override
440                 public void onComplete(Throwable throwable, ActorRef actorRef) {
441                     if (throwable != null) {
442                         tryReschedule(throwable);
443                     } else {
444                         LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
445
446                         system.scheduler().scheduleOnce(
447                                 SHARD_LOOKUP_TASK_INTERVAL,
448                                 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
449                                         shardingService, toLookup, lookupMaxRetries),
450                                 system.dispatcher());
451                     }
452                 }
453             }, system.dispatcher());
454         }
455
456         @Override
457         void reschedule(int retries) {
458             LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
459             system.scheduler().scheduleOnce(
460                     SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
461         }
462     }
463
464     /**
465      * Handles the readiness step by waiting for a leader of the created shard.
466      */
467     private static class ShardLeaderLookupTask extends LookupTask {
468
469         private final ActorSystem system;
470         private final ActorRef replyTo;
471         private final ActorUtils context;
472         private final ClusterWrapper clusterWrapper;
473         private final ActorRef shard;
474         private final DistributedShardedDOMDataTree shardingService;
475         private final DOMDataTreeIdentifier toLookup;
476         private final int lookupMaxRetries;
477
478         ShardLeaderLookupTask(final ActorSystem system,
479                               final ActorRef replyTo,
480                               final ActorUtils context,
481                               final ClusterWrapper clusterWrapper,
482                               final ActorRef shard,
483                               final DistributedShardedDOMDataTree shardingService,
484                               final DOMDataTreeIdentifier toLookup,
485                               final int lookupMaxRetries) {
486             super(replyTo, lookupMaxRetries);
487             this.system = system;
488             this.replyTo = replyTo;
489             this.context = context;
490             this.clusterWrapper = clusterWrapper;
491             this.shard = shard;
492             this.shardingService = shardingService;
493             this.toLookup = toLookup;
494             this.lookupMaxRetries = lookupMaxRetries;
495         }
496
497         @Override
498         public void run() {
499
500             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
501
502             ask.onComplete(new OnComplete<Object>() {
503                 @Override
504                 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
505                     if (throwable != null) {
506                         tryReschedule(throwable);
507                     } else {
508                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
509                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
510                         if (leaderActor.isPresent()) {
511                             // leader is found, backend seems ready, check if the frontend is ready
512                             LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
513                                     clusterWrapper.getCurrentMemberName(), toLookup);
514                             system.scheduler().scheduleOnce(
515                                     SHARD_LOOKUP_TASK_INTERVAL,
516                                     new FrontendLookupTask(
517                                             system, replyTo, shardingService, toLookup, lookupMaxRetries),
518                                     system.dispatcher());
519                         } else {
520                             tryReschedule(null);
521                         }
522                     }
523                 }
524             }, system.dispatcher());
525
526         }
527
528         @Override
529         void reschedule(int retries) {
530             LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
531                     clusterWrapper.getCurrentMemberName(), toLookup, retries);
532             system.scheduler().scheduleOnce(
533                     SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
534         }
535     }
536
537     /**
538      * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
539      * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
540      * case they race), the future for the cds shard creation is completed and the shard is ready for use.
541      */
542     private static final class FrontendLookupTask extends LookupTask {
543
544         private final ActorSystem system;
545         private final ActorRef replyTo;
546         private final DistributedShardedDOMDataTree shardingService;
547         private final DOMDataTreeIdentifier toLookup;
548
549         FrontendLookupTask(final ActorSystem system,
550                            final ActorRef replyTo,
551                            final DistributedShardedDOMDataTree shardingService,
552                            final DOMDataTreeIdentifier toLookup,
553                            final int lookupMaxRetries) {
554             super(replyTo, lookupMaxRetries);
555             this.system = system;
556             this.replyTo = replyTo;
557             this.shardingService = shardingService;
558             this.toLookup = toLookup;
559         }
560
561         @Override
562         public void run() {
563             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
564                     shardingService.lookupShardFrontend(toLookup);
565
566             if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
567                 replyTo.tell(new Success(null), ActorRef.noSender());
568             } else {
569                 tryReschedule(null);
570             }
571         }
572
573         private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
574                                           final DOMDataTreeIdentifier prefix) {
575             if (entry == null) {
576                 return false;
577             }
578
579             if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
580                 return true;
581             }
582
583             if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
584                 return true;
585             }
586
587             return false;
588         }
589
590         @Override
591         void reschedule(int retries) {
592             LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
593             system.scheduler().scheduleOnce(
594                     SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
595         }
596     }
597
598     /**
599      * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
600      * configuration.
601      */
602     private static class ShardRemovalLookupTask extends LookupTask {
603
604         private final ActorSystem system;
605         private final ActorRef replyTo;
606         private final ActorUtils context;
607         private final DOMDataTreeIdentifier toLookup;
608
609         ShardRemovalLookupTask(final ActorSystem system,
610                                final ActorRef replyTo,
611                                final ActorUtils context,
612                                final DOMDataTreeIdentifier toLookup,
613                                final int lookupMaxRetries) {
614             super(replyTo, lookupMaxRetries);
615             this.system = system;
616             this.replyTo = replyTo;
617             this.context = context;
618             this.toLookup = toLookup;
619         }
620
621         @Override
622         public void run() {
623             final Future<ActorRef> localShardFuture =
624                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
625
626             localShardFuture.onComplete(new OnComplete<ActorRef>() {
627                 @Override
628                 public void onComplete(Throwable throwable, ActorRef actorRef) {
629                     if (throwable != null) {
630                         //TODO Shouldn't we check why findLocalShard failed?
631                         LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
632                                 toLookup);
633                         replyTo.tell(new Success(null), ActorRef.noSender());
634                     } else {
635                         tryReschedule(null);
636                     }
637                 }
638             }, system.dispatcher());
639         }
640
641         @Override
642         void reschedule(int retries) {
643             LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
644                     toLookup, retries);
645             system.scheduler().scheduleOnce(
646                     SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
647         }
648     }
649
650     /**
651      * Task for handling the lookup of the backend for the configuration shard.
652      */
653     private static class ConfigShardLookupTask extends LookupTask {
654
655         private final ActorSystem system;
656         private final ActorRef replyTo;
657         private final ActorUtils context;
658
659         ConfigShardLookupTask(final ActorSystem system,
660                               final ActorRef replyTo,
661                               final ActorUtils context,
662                               final StartConfigShardLookup message,
663                               final int lookupMaxRetries) {
664             super(replyTo, lookupMaxRetries);
665             this.system = system;
666             this.replyTo = replyTo;
667             this.context = context;
668         }
669
670         @Override
671         void reschedule(int retries) {
672             LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
673             system.scheduler().scheduleOnce(
674                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
675         }
676
677         @Override
678         public void run() {
679             final Optional<ActorRef> localShard =
680                     context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
681
682             if (!localShard.isPresent()) {
683                 tryReschedule(null);
684             } else {
685                 LOG.debug("Local backend for prefix configuration shard lookup successful");
686                 replyTo.tell(new Status.Success(null), ActorRef.noSender());
687             }
688         }
689     }
690
691     /**
692      * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
693      */
694     private static class ConfigShardReadinessTask extends LookupTask {
695
696         private final ActorSystem system;
697         private final ActorRef replyTo;
698         private final ActorUtils context;
699         private final ClusterWrapper clusterWrapper;
700         private final ActorRef shard;
701
702         ConfigShardReadinessTask(final ActorSystem system,
703                                  final ActorRef replyTo,
704                                  final ActorUtils context,
705                                  final ClusterWrapper clusterWrapper,
706                                  final ActorRef shard,
707                                  final int lookupMaxRetries) {
708             super(replyTo, lookupMaxRetries);
709             this.system = system;
710             this.replyTo = replyTo;
711             this.context = context;
712             this.clusterWrapper = clusterWrapper;
713             this.shard = shard;
714         }
715
716         @Override
717         void reschedule(int retries) {
718             LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
719                     clusterWrapper.getCurrentMemberName(), retries);
720             system.scheduler().scheduleOnce(
721                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
722         }
723
724         @Override
725         public void run() {
726             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
727
728             ask.onComplete(new OnComplete<Object>() {
729                 @Override
730                 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
731                     if (throwable != null) {
732                         tryReschedule(throwable);
733                     } else {
734                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
735                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
736                         if (leaderActor.isPresent()) {
737                             // leader is found, backend seems ready, check if the frontend is ready
738                             LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
739                                     clusterWrapper.getCurrentMemberName());
740                             replyTo.tell(new Status.Success(null), ActorRef.noSender());
741                         } else {
742                             tryReschedule(null);
743                         }
744                     }
745                 }
746             }, system.dispatcher());
747         }
748     }
749
750     public static class ShardedDataTreeActorCreator {
751
752         private DistributedShardedDOMDataTree shardingService;
753         private AbstractDataStore distributedConfigDatastore;
754         private AbstractDataStore distributedOperDatastore;
755         private ActorSystem actorSystem;
756         private ClusterWrapper cluster;
757         private int maxRetries;
758
759         public DistributedShardedDOMDataTree getShardingService() {
760             return shardingService;
761         }
762
763         public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
764             this.shardingService = shardingService;
765             return this;
766         }
767
768         public ActorSystem getActorSystem() {
769             return actorSystem;
770         }
771
772         public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
773             this.actorSystem = actorSystem;
774             return this;
775         }
776
777         public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
778             this.cluster = clusterWrapper;
779             return this;
780         }
781
782         public ClusterWrapper getClusterWrapper() {
783             return cluster;
784         }
785
786         public AbstractDataStore getDistributedConfigDatastore() {
787             return distributedConfigDatastore;
788         }
789
790         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
791                 final AbstractDataStore distributedConfigDatastore) {
792             this.distributedConfigDatastore = distributedConfigDatastore;
793             return this;
794         }
795
796         public AbstractDataStore getDistributedOperDatastore() {
797             return distributedOperDatastore;
798         }
799
800         public ShardedDataTreeActorCreator setDistributedOperDatastore(
801                 final AbstractDataStore distributedOperDatastore) {
802             this.distributedOperDatastore = distributedOperDatastore;
803             return this;
804         }
805
806         public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
807             this.maxRetries = newMaxRetries;
808             return this;
809         }
810
811         public int getLookupTaskMaxRetries() {
812             return maxRetries;
813         }
814
815         private void verify() {
816             Preconditions.checkNotNull(shardingService);
817             Preconditions.checkNotNull(actorSystem);
818             Preconditions.checkNotNull(cluster);
819             Preconditions.checkNotNull(distributedConfigDatastore);
820             Preconditions.checkNotNull(distributedOperDatastore);
821         }
822
823         public Props props() {
824             verify();
825             return Props.create(ShardedDataTreeActor.class, this);
826         }
827     }
828 }