BUG-2138: DistributedShardListeners support for nested shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.Status.Failure;
20 import akka.actor.Status.Success;
21 import akka.cluster.Cluster;
22 import akka.cluster.ClusterEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.MemberWeaklyUp;
27 import akka.cluster.ClusterEvent.ReachableMember;
28 import akka.cluster.ClusterEvent.UnreachableMember;
29 import akka.cluster.Member;
30 import akka.cluster.ddata.DistributedData;
31 import akka.cluster.ddata.ORMap;
32 import akka.cluster.ddata.Replicator;
33 import akka.cluster.ddata.Replicator.Changed;
34 import akka.cluster.ddata.Replicator.Subscribe;
35 import akka.cluster.ddata.Replicator.Update;
36 import akka.dispatch.OnComplete;
37 import akka.pattern.Patterns;
38 import akka.util.Timeout;
39 import com.google.common.base.Optional;
40 import com.google.common.base.Preconditions;
41 import com.google.common.collect.Sets;
42 import com.google.common.collect.Sets.SetView;
43 import java.util.ArrayList;
44 import java.util.Collection;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.concurrent.CompletableFuture;
49 import java.util.concurrent.TimeUnit;
50 import java.util.function.Function;
51 import java.util.stream.Collectors;
52 import javax.annotation.Nullable;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
55 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
56 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
57 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
58 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
59 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
60 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
61 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
62 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
63 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
64 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
65 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
66 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
67 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
68 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
69 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
74 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
75 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
76 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
77 import org.opendaylight.yangtools.concepts.ListenerRegistration;
78 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81 import scala.compat.java8.FutureConverters;
82 import scala.concurrent.Future;
83 import scala.concurrent.duration.FiniteDuration;
84
85 /**
86  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
87  * nodes of newly open producers/shards on the local node.
88  */
89 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
90
91     private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
92
93     private static final String PERSISTENCE_ID = "sharding-service-actor";
94     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
95
96     static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
97     static final int LOOKUP_TASK_MAX_RETRIES = 100;
98
99     private final DistributedShardedDOMDataTree shardingService;
100     private final ActorSystem actorSystem;
101     private final ClusterWrapper clusterWrapper;
102     // helper actorContext used only for static calls to executeAsync etc
103     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
104     private final ActorContext actorContext;
105     private final ShardingServiceAddressResolver resolver;
106     private final DistributedDataStore distributedConfigDatastore;
107     private final DistributedDataStore distributedOperDatastore;
108
109     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
110     private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
111
112     private final Cluster cluster;
113     private final ActorRef replicator;
114
115     private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
116     private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
117
118     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
119         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
120
121         shardingService = builder.getShardingService();
122         actorSystem = builder.getActorSystem();
123         clusterWrapper = builder.getClusterWrapper();
124         distributedConfigDatastore = builder.getDistributedConfigDatastore();
125         distributedOperDatastore = builder.getDistributedOperDatastore();
126         actorContext = distributedConfigDatastore.getActorContext();
127         resolver = new ShardingServiceAddressResolver(
128                 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
129
130         clusterWrapper.subscribeToMemberEvents(self());
131         cluster = Cluster.get(actorSystem);
132
133         replicator = DistributedData.get(context().system()).replicator();
134     }
135
136     @Override
137     public void preStart() {
138         final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
139                 new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
140         replicator.tell(subscribe, noSender());
141     }
142
143     @Override
144     protected void handleRecover(final Object message) throws Exception {
145         LOG.debug("Received a recover message {}", message);
146     }
147
148     @Override
149     protected void handleCommand(final Object message) throws Exception {
150         LOG.debug("Received {}", message);
151         if (message instanceof ClusterEvent.MemberUp) {
152             memberUp((ClusterEvent.MemberUp) message);
153         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
154             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
155         } else if (message instanceof ClusterEvent.MemberExited) {
156             memberExited((ClusterEvent.MemberExited) message);
157         } else if (message instanceof ClusterEvent.MemberRemoved) {
158             memberRemoved((ClusterEvent.MemberRemoved) message);
159         } else if (message instanceof ClusterEvent.UnreachableMember) {
160             memberUnreachable((ClusterEvent.UnreachableMember) message);
161         } else if (message instanceof ClusterEvent.ReachableMember) {
162             memberReachable((ClusterEvent.ReachableMember) message);
163         } else if (message instanceof Changed) {
164             onConfigChanged((Changed) message);
165         } else if (message instanceof ProducerCreated) {
166             onProducerCreated((ProducerCreated) message);
167         } else if (message instanceof NotifyProducerCreated) {
168             onNotifyProducerCreated((NotifyProducerCreated) message);
169         } else if (message instanceof ProducerRemoved) {
170             onProducerRemoved((ProducerRemoved) message);
171         } else if (message instanceof NotifyProducerRemoved) {
172             onNotifyProducerRemoved((NotifyProducerRemoved) message);
173         } else if (message instanceof PrefixShardCreated) {
174             onPrefixShardCreated((PrefixShardCreated) message);
175         } else if (message instanceof CreatePrefixShard) {
176             onCreatePrefixShard((CreatePrefixShard) message);
177         } else if (message instanceof RemovePrefixShard) {
178             onRemovePrefixShard((RemovePrefixShard) message);
179         } else if (message instanceof PrefixShardRemoved) {
180             onPrefixShardRemoved((PrefixShardRemoved) message);
181         }
182     }
183
184     private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
185         LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
186
187         currentData = change.dataValue();
188         final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
189
190         LOG.debug("Changed set {}", changedConfig);
191
192         try {
193             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
194                     changedConfig.values().stream().collect(
195                             Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
196             resolveConfig(newConfig);
197         } catch (final IllegalStateException e) {
198             LOG.error("Failed, ", e);
199         }
200
201     }
202
203     private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
204
205         // get the removed configurations
206         final SetView<DOMDataTreeIdentifier> deleted =
207                 Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
208         shardingService.resolveShardRemovals(deleted);
209
210         // get the added configurations
211         final SetView<DOMDataTreeIdentifier> additions =
212                 Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
213         shardingService.resolveShardAdditions(additions);
214         // we can ignore those that existed previously since the potential changes in replicas will be handled by
215         // shard manager.
216
217         currentConfiguration = new HashMap<>(newConfig);
218     }
219
220     @Override
221     public String persistenceId() {
222         return PERSISTENCE_ID;
223     }
224
225     private void memberUp(final MemberUp message) {
226         final MemberName memberName = memberToName(message.member());
227
228         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
229                 message.member().address());
230
231         resolver.addPeerAddress(memberName, message.member().address());
232     }
233
234     private void memberWeaklyUp(final MemberWeaklyUp message) {
235         final MemberName memberName = memberToName(message.member());
236
237         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
238                 message.member().address());
239
240         resolver.addPeerAddress(memberName, message.member().address());
241     }
242
243     private void memberExited(final MemberExited message) {
244         final MemberName memberName = memberToName(message.member());
245
246         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
247                 message.member().address());
248
249         resolver.removePeerAddress(memberName);
250     }
251
252     private void memberRemoved(final MemberRemoved message) {
253         final MemberName memberName = memberToName(message.member());
254
255         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
256                 message.member().address());
257
258         resolver.removePeerAddress(memberName);
259     }
260
261     private void memberUnreachable(final UnreachableMember message) {
262         final MemberName memberName = memberToName(message.member());
263         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
264
265         resolver.removePeerAddress(memberName);
266     }
267
268     private void memberReachable(final ReachableMember message) {
269         final MemberName memberName = memberToName(message.member());
270         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
271
272         resolver.addPeerAddress(memberName, message.member().address());
273     }
274
275     private void onProducerCreated(final ProducerCreated message) {
276         LOG.debug("Received ProducerCreated: {}", message);
277
278         // fastpath if we have no peers
279         if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
280             getSender().tell(new Status.Success(null), noSender());
281         }
282
283         final ActorRef sender = getSender();
284         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
285
286         final List<CompletableFuture<Object>> futures = new ArrayList<>();
287
288         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
289             final ActorSelection actorSelection = actorSystem.actorSelection(address);
290             futures.add(
291                     FutureConverters.toJava(
292                             actorContext.executeOperationAsync(
293                                     actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
294                     .toCompletableFuture());
295         }
296
297         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
298                 futures.toArray(new CompletableFuture[futures.size()]));
299
300         combinedFuture.thenRun(() -> {
301             sender.tell(new Status.Success(null), noSender());
302         }).exceptionally(throwable -> {
303             sender.tell(new Status.Failure(throwable), self());
304             return null;
305         });
306     }
307
308     private void onNotifyProducerCreated(final NotifyProducerCreated message) {
309         LOG.debug("Received NotifyProducerCreated: {}", message);
310
311         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
312
313         try {
314             final ActorProducerRegistration registration =
315                     new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
316             subtrees.forEach(id -> idToProducer.put(id, registration));
317             sender().tell(new Status.Success(null), self());
318         } catch (final IllegalArgumentException e) {
319             sender().tell(new Status.Failure(e), getSelf());
320         }
321     }
322
323     private void onProducerRemoved(final ProducerRemoved message) {
324         LOG.debug("Received ProducerRemoved: {}", message);
325
326         final List<CompletableFuture<Object>> futures = new ArrayList<>();
327
328         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
329             final ActorSelection selection = actorSystem.actorSelection(address);
330
331             futures.add(FutureConverters.toJava(
332                     actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
333                     .toCompletableFuture());
334         }
335
336         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
337                 futures.toArray(new CompletableFuture[futures.size()]));
338
339         final ActorRef respondTo = getSender();
340
341         combinedFuture
342                 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
343                 .exceptionally(e -> {
344                     respondTo.tell(new Status.Failure(null), self());
345                     return null;
346                 });
347
348     }
349
350     private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
351         LOG.debug("Received NotifyProducerRemoved: {}", message);
352
353         final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
354         if (registration == null) {
355             LOG.warn("The notification contained a path on which no producer is registered, throwing away");
356             getSender().tell(new Status.Success(null), noSender());
357             return;
358         }
359
360         try {
361             registration.close();
362             getSender().tell(new Status.Success(null), noSender());
363         } catch (final DOMDataTreeProducerException e) {
364             LOG.error("Unable to close producer", e);
365             getSender().tell(new Status.Failure(e), noSender());
366         }
367     }
368
369     @SuppressWarnings("checkstyle:IllegalCatch")
370     private void onCreatePrefixShard(final CreatePrefixShard message) {
371         LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
372
373         final PrefixShardConfiguration configuration = message.getConfiguration();
374
375         final Update<ORMap<PrefixShardConfiguration>> update =
376                 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
377                     map -> map.put(cluster, configuration.toDataMapKey(), configuration));
378
379         replicator.tell(update, self());
380
381         // schedule a notification task for the reply
382         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
383                 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
384                         actorContext, shardingService, configuration.getPrefix()),
385                 actorSystem.dispatcher());
386     }
387
388     private void onPrefixShardCreated(final PrefixShardCreated message) {
389         LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
390
391         final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
392         final ActorRef sender = getSender();
393
394         final List<CompletableFuture<Object>> futures = new ArrayList<>();
395
396         for (final String address : addresses) {
397             final ActorSelection actorSelection = actorSystem.actorSelection(address);
398             futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
399                     new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
400         }
401
402         final CompletableFuture<Void> combinedFuture =
403                 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
404
405         combinedFuture.thenRun(() -> {
406             sender.tell(new Status.Success(null), self());
407         }).exceptionally(throwable -> {
408             sender.tell(new Status.Failure(throwable), self());
409             return null;
410         });
411     }
412
413     private void onRemovePrefixShard(final RemovePrefixShard message) {
414         LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
415
416         //TODO the removal message should have the configuration or some other way to get to the key
417         final Update<ORMap<PrefixShardConfiguration>> removal =
418                 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
419                     map -> map.remove(cluster, "prefix=" + message.getPrefix()));
420         replicator.tell(removal, self());
421
422         final ShardRemovalLookupTask removalTask =
423                 new ShardRemovalLookupTask(actorSystem, getSender(),
424                         actorContext, message.getPrefix());
425
426         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
427     }
428
429     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
430         LOG.debug("Received PrefixShardRemoved: {}", message);
431
432         final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
433
434         if (registration == null) {
435             LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
436                     message.getPrefix(), idToShardRegistration);
437             return;
438         }
439
440         registration.close();
441     }
442
443     private static MemberName memberToName(final Member member) {
444         return MemberName.forName(member.roles().iterator().next());
445     }
446
447     private class ActorProducerRegistration {
448
449         private final DOMDataTreeProducer producer;
450         private final Collection<DOMDataTreeIdentifier> subtrees;
451
452         ActorProducerRegistration(final DOMDataTreeProducer producer,
453                                   final Collection<DOMDataTreeIdentifier> subtrees) {
454             this.producer = producer;
455             this.subtrees = subtrees;
456         }
457
458         void close() throws DOMDataTreeProducerException {
459             producer.close();
460             subtrees.forEach(idToProducer::remove);
461         }
462     }
463
464     private static class ShardFrontendRegistration extends
465             AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
466
467         private final ActorRef clientActor;
468         private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
469
470         ShardFrontendRegistration(final ActorRef clientActor,
471                                   final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
472             super(shardRegistration);
473             this.clientActor = clientActor;
474             this.shardRegistration = shardRegistration;
475         }
476
477         @Override
478         protected void removeRegistration() {
479             shardRegistration.close();
480             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
481         }
482     }
483
484     private abstract static class LookupTask implements Runnable {
485
486         private final ActorRef replyTo;
487         private int retries = 0;
488
489         private LookupTask(final ActorRef replyTo) {
490             this.replyTo = replyTo;
491         }
492
493         abstract void reschedule(int retries);
494
495         void tryReschedule(@Nullable final Throwable throwable) {
496             if (retries <= LOOKUP_TASK_MAX_RETRIES) {
497                 retries++;
498                 reschedule(retries);
499             } else {
500                 fail(throwable);
501             }
502         }
503
504         void fail(@Nullable final Throwable throwable) {
505             if (throwable == null) {
506                 replyTo.tell(new Failure(
507                         new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
508                                 + "Failing..")), noSender());
509             } else {
510                 replyTo.tell(new Failure(
511                         new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
512                                 + "Failing..", throwable)), noSender());
513             }
514         }
515     }
516
517     /**
518      * Handles the lookup step of cds shard creation once the configuration is updated.
519      */
520     private static class ShardCreationLookupTask extends LookupTask {
521
522         private final ActorSystem system;
523         private final ActorRef replyTo;
524         private final ClusterWrapper clusterWrapper;
525         private final ActorContext context;
526         private final DistributedShardedDOMDataTree shardingService;
527         private final DOMDataTreeIdentifier toLookup;
528
529         ShardCreationLookupTask(final ActorSystem system,
530                                 final ActorRef replyTo,
531                                 final ClusterWrapper clusterWrapper,
532                                 final ActorContext context,
533                                 final DistributedShardedDOMDataTree shardingService,
534                                 final DOMDataTreeIdentifier toLookup) {
535             super(replyTo);
536             this.system = system;
537             this.replyTo = replyTo;
538             this.clusterWrapper = clusterWrapper;
539             this.context = context;
540             this.shardingService = shardingService;
541             this.toLookup = toLookup;
542         }
543
544         @Override
545         public void run() {
546             final Future<ActorRef> localShardFuture =
547                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
548
549             localShardFuture.onComplete(new OnComplete<ActorRef>() {
550                 @Override
551                 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
552                     if (throwable != null) {
553                         tryReschedule(throwable);
554                     } else {
555                         LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
556
557                         system.scheduler().scheduleOnce(
558                                 SHARD_LOOKUP_TASK_INTERVAL,
559                                 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
560                                         shardingService, toLookup),
561                                 system.dispatcher());
562                     }
563                 }
564             }, system.dispatcher());
565         }
566
567         @Override
568         void reschedule(int retries) {
569             LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
570             system.scheduler().scheduleOnce(
571                     SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
572         }
573     }
574
575     /**
576      * Handles the readiness step by waiting for a leader of the created shard.
577      */
578     private static class ShardLeaderLookupTask extends LookupTask {
579
580         private final ActorSystem system;
581         private final ActorRef replyTo;
582         private final ActorContext context;
583         private final ClusterWrapper clusterWrapper;
584         private final ActorRef shard;
585         private final DistributedShardedDOMDataTree shardingService;
586         private final DOMDataTreeIdentifier toLookup;
587
588         ShardLeaderLookupTask(final ActorSystem system,
589                               final ActorRef replyTo,
590                               final ActorContext context,
591                               final ClusterWrapper clusterWrapper,
592                               final ActorRef shard,
593                               final DistributedShardedDOMDataTree shardingService,
594                               final DOMDataTreeIdentifier toLookup) {
595             super(replyTo);
596             this.system = system;
597             this.replyTo = replyTo;
598             this.context = context;
599             this.clusterWrapper = clusterWrapper;
600             this.shard = shard;
601             this.shardingService = shardingService;
602             this.toLookup = toLookup;
603         }
604
605         @Override
606         public void run() {
607
608             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
609
610             ask.onComplete(new OnComplete<Object>() {
611                 @Override
612                 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
613                     if (throwable != null) {
614                         tryReschedule(throwable);
615                     } else {
616                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
617                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
618                         if (leaderActor.isPresent()) {
619                             // leader is found, backend seems ready, check if the frontend is ready
620                             LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
621                                     clusterWrapper.getCurrentMemberName(), toLookup);
622                             system.scheduler().scheduleOnce(
623                                     SHARD_LOOKUP_TASK_INTERVAL,
624                                     new FrontendLookupTask(system, replyTo, shardingService, toLookup),
625                                     system.dispatcher());
626                         } else {
627                             tryReschedule(null);
628                         }
629                     }
630                 }
631             }, system.dispatcher());
632
633         }
634
635         @Override
636         void reschedule(int retries) {
637             LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
638                     clusterWrapper.getCurrentMemberName(), toLookup, retries);
639             system.scheduler().scheduleOnce(
640                     SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
641         }
642     }
643
644     /**
645      * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
646      * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
647      * case they race), the future for the cds shard creation is completed and the shard is ready for use.
648      */
649     private static final class FrontendLookupTask extends LookupTask {
650
651         private final ActorSystem system;
652         private final ActorRef replyTo;
653         private final DistributedShardedDOMDataTree shardingService;
654         private final DOMDataTreeIdentifier toLookup;
655
656         FrontendLookupTask(final ActorSystem system,
657                            final ActorRef replyTo,
658                            final DistributedShardedDOMDataTree shardingService,
659                            final DOMDataTreeIdentifier toLookup) {
660             super(replyTo);
661             this.system = system;
662             this.replyTo = replyTo;
663             this.shardingService = shardingService;
664             this.toLookup = toLookup;
665         }
666
667         @Override
668         public void run() {
669             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
670                     shardingService.lookupShardFrontend(toLookup);
671
672             if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
673                 replyTo.tell(new Success(null), noSender());
674             } else {
675                 tryReschedule(null);
676             }
677         }
678
679         private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
680                                           final DOMDataTreeIdentifier prefix) {
681             if (entry == null) {
682                 return false;
683             }
684
685             if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
686                 return true;
687             }
688
689             if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
690                 return true;
691             }
692
693             return false;
694         }
695
696         @Override
697         void reschedule(int retries) {
698             LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
699             system.scheduler().scheduleOnce(
700                     SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
701         }
702     }
703
704     /**
705      * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
706      * configuration.
707      */
708     private static class ShardRemovalLookupTask extends LookupTask {
709
710         private final ActorSystem system;
711         private final ActorRef replyTo;
712         private final ActorContext context;
713         private final DOMDataTreeIdentifier toLookup;
714
715         ShardRemovalLookupTask(final ActorSystem system,
716                                final ActorRef replyTo,
717                                final ActorContext context,
718                                final DOMDataTreeIdentifier toLookup) {
719             super(replyTo);
720             this.system = system;
721             this.replyTo = replyTo;
722             this.context = context;
723             this.toLookup = toLookup;
724         }
725
726         @Override
727         public void run() {
728             final Future<ActorRef> localShardFuture =
729                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
730
731             localShardFuture.onComplete(new OnComplete<ActorRef>() {
732                 @Override
733                 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
734                     if (throwable != null) {
735                         //TODO Shouldn't we check why findLocalShard failed?
736                         LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
737                                 toLookup);
738                         replyTo.tell(new Success(null), noSender());
739                     } else {
740                         tryReschedule(null);
741                     }
742                 }
743             }, system.dispatcher());
744         }
745
746         @Override
747         void reschedule(int retries) {
748             LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
749                     toLookup, retries);
750             system.scheduler().scheduleOnce(
751                     SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
752         }
753     }
754
755     public static class ShardedDataTreeActorCreator {
756
757         private DistributedShardedDOMDataTree shardingService;
758         private DistributedDataStore distributedConfigDatastore;
759         private DistributedDataStore distributedOperDatastore;
760         private ActorSystem actorSystem;
761         private ClusterWrapper cluster;
762
763         public DistributedShardedDOMDataTree getShardingService() {
764             return shardingService;
765         }
766
767         public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
768             this.shardingService = shardingService;
769             return this;
770         }
771
772         public ActorSystem getActorSystem() {
773             return actorSystem;
774         }
775
776         public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
777             this.actorSystem = actorSystem;
778             return this;
779         }
780
781         public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
782             this.cluster = cluster;
783             return this;
784         }
785
786         public ClusterWrapper getClusterWrapper() {
787             return cluster;
788         }
789
790         public DistributedDataStore getDistributedConfigDatastore() {
791             return distributedConfigDatastore;
792         }
793
794         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
795                 final DistributedDataStore distributedConfigDatastore) {
796             this.distributedConfigDatastore = distributedConfigDatastore;
797             return this;
798         }
799
800         public DistributedDataStore getDistributedOperDatastore() {
801             return distributedOperDatastore;
802         }
803
804         public ShardedDataTreeActorCreator setDistributedOperDatastore(
805                 final DistributedDataStore distributedOperDatastore) {
806             this.distributedOperDatastore = distributedOperDatastore;
807             return this;
808         }
809
810         private void verify() {
811             Preconditions.checkNotNull(shardingService);
812             Preconditions.checkNotNull(actorSystem);
813             Preconditions.checkNotNull(cluster);
814             Preconditions.checkNotNull(distributedConfigDatastore);
815             Preconditions.checkNotNull(distributedOperDatastore);
816         }
817
818         public Props props() {
819             verify();
820             return Props.create(ShardedDataTreeActor.class, this);
821         }
822     }
823 }