sal-distributed-datastore: use lambdas
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.Status.Success;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberExited;
23 import akka.cluster.ClusterEvent.MemberRemoved;
24 import akka.cluster.ClusterEvent.MemberUp;
25 import akka.cluster.ClusterEvent.MemberWeaklyUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import akka.pattern.Patterns;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Preconditions;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.TimeUnit;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
44 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
45 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
46 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
47 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
48 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
49 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
50 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
51 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
52 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
53 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
54 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
55 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
56 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
57 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
58 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
59 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
60 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
65 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
66 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
67 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
68 import org.opendaylight.yangtools.concepts.ListenerRegistration;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72 import scala.compat.java8.FutureConverters;
73 import scala.concurrent.Future;
74 import scala.concurrent.duration.FiniteDuration;
75
76 /**
77  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
78  * nodes of newly open producers/shards on the local node.
79  */
80 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
81
82     private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
83
84     private static final String PERSISTENCE_ID = "sharding-service-actor";
85     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
86
87     static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
88
89     private final DistributedShardedDOMDataTree shardingService;
90     private final ActorSystem actorSystem;
91     private final ClusterWrapper clusterWrapper;
92     // helper actorContext used only for static calls to executeAsync etc
93     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
94     private final ActorContext actorContext;
95     private final ShardingServiceAddressResolver resolver;
96     private final AbstractDataStore distributedConfigDatastore;
97     private final AbstractDataStore distributedOperDatastore;
98     private final int lookupTaskMaxRetries;
99
100     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
101     private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
102
103     private final Cluster cluster;
104
105     private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
106
107     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
108         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
109
110         shardingService = builder.getShardingService();
111         actorSystem = builder.getActorSystem();
112         clusterWrapper = builder.getClusterWrapper();
113         distributedConfigDatastore = builder.getDistributedConfigDatastore();
114         distributedOperDatastore = builder.getDistributedOperDatastore();
115         lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
116         actorContext = distributedConfigDatastore.getActorContext();
117         resolver = new ShardingServiceAddressResolver(
118                 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
119
120         clusterWrapper.subscribeToMemberEvents(self());
121         cluster = Cluster.get(actorSystem);
122     }
123
124     @Override
125     public void preStart() {
126     }
127
128     @Override
129     protected void handleRecover(final Object message) throws Exception {
130         LOG.debug("Received a recover message {}", message);
131     }
132
133     @Override
134     protected void handleCommand(final Object message) throws Exception {
135         LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
136         if (message instanceof ClusterEvent.MemberUp) {
137             memberUp((ClusterEvent.MemberUp) message);
138         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
139             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
140         } else if (message instanceof ClusterEvent.MemberExited) {
141             memberExited((ClusterEvent.MemberExited) message);
142         } else if (message instanceof ClusterEvent.MemberRemoved) {
143             memberRemoved((ClusterEvent.MemberRemoved) message);
144         } else if (message instanceof ClusterEvent.UnreachableMember) {
145             memberUnreachable((ClusterEvent.UnreachableMember) message);
146         } else if (message instanceof ClusterEvent.ReachableMember) {
147             memberReachable((ClusterEvent.ReachableMember) message);
148         } else if (message instanceof ProducerCreated) {
149             onProducerCreated((ProducerCreated) message);
150         } else if (message instanceof NotifyProducerCreated) {
151             onNotifyProducerCreated((NotifyProducerCreated) message);
152         } else if (message instanceof ProducerRemoved) {
153             onProducerRemoved((ProducerRemoved) message);
154         } else if (message instanceof NotifyProducerRemoved) {
155             onNotifyProducerRemoved((NotifyProducerRemoved) message);
156         } else if (message instanceof PrefixShardCreated) {
157             onPrefixShardCreated((PrefixShardCreated) message);
158         } else if (message instanceof LookupPrefixShard) {
159             onLookupPrefixShard((LookupPrefixShard) message);
160         } else if (message instanceof PrefixShardRemovalLookup) {
161             onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
162         } else if (message instanceof PrefixShardRemoved) {
163             onPrefixShardRemoved((PrefixShardRemoved) message);
164         } else if (message instanceof StartConfigShardLookup) {
165             onStartConfigShardLookup((StartConfigShardLookup) message);
166         }
167     }
168
169     @Override
170     public String persistenceId() {
171         return PERSISTENCE_ID;
172     }
173
174     private void memberUp(final MemberUp message) {
175         final MemberName memberName = memberToName(message.member());
176
177         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
178                 message.member().address());
179
180         resolver.addPeerAddress(memberName, message.member().address());
181     }
182
183     private void memberWeaklyUp(final MemberWeaklyUp message) {
184         final MemberName memberName = memberToName(message.member());
185
186         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
187                 message.member().address());
188
189         resolver.addPeerAddress(memberName, message.member().address());
190     }
191
192     private void memberExited(final MemberExited message) {
193         final MemberName memberName = memberToName(message.member());
194
195         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
196                 message.member().address());
197
198         resolver.removePeerAddress(memberName);
199     }
200
201     private void memberRemoved(final MemberRemoved message) {
202         final MemberName memberName = memberToName(message.member());
203
204         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
205                 message.member().address());
206
207         resolver.removePeerAddress(memberName);
208     }
209
210     private void memberUnreachable(final UnreachableMember message) {
211         final MemberName memberName = memberToName(message.member());
212         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
213
214         resolver.removePeerAddress(memberName);
215     }
216
217     private void memberReachable(final ReachableMember message) {
218         final MemberName memberName = memberToName(message.member());
219         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
220
221         resolver.addPeerAddress(memberName, message.member().address());
222     }
223
224     private void onProducerCreated(final ProducerCreated message) {
225         LOG.debug("Received ProducerCreated: {}", message);
226
227         // fastpath if we have no peers
228         if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
229             getSender().tell(new Status.Success(null), noSender());
230         }
231
232         final ActorRef sender = getSender();
233         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
234
235         final List<CompletableFuture<Object>> futures = new ArrayList<>();
236
237         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
238             final ActorSelection actorSelection = actorSystem.actorSelection(address);
239             futures.add(
240                     FutureConverters.toJava(
241                             actorContext.executeOperationAsync(
242                                     actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
243                     .toCompletableFuture());
244         }
245
246         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
247                 futures.toArray(new CompletableFuture[futures.size()]));
248
249         combinedFuture
250                 .thenRun(() -> sender.tell(new Success(null), noSender()))
251                 .exceptionally(throwable -> {
252                     sender.tell(new Status.Failure(throwable), self());
253                     return null;
254                 });
255     }
256
257     private void onNotifyProducerCreated(final NotifyProducerCreated message) {
258         LOG.debug("Received NotifyProducerCreated: {}", message);
259
260         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
261
262         try {
263             final ActorProducerRegistration registration =
264                     new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
265             subtrees.forEach(id -> idToProducer.put(id, registration));
266             sender().tell(new Status.Success(null), self());
267         } catch (final IllegalArgumentException e) {
268             sender().tell(new Status.Failure(e), getSelf());
269         }
270     }
271
272     private void onProducerRemoved(final ProducerRemoved message) {
273         LOG.debug("Received ProducerRemoved: {}", message);
274
275         final List<CompletableFuture<Object>> futures = new ArrayList<>();
276
277         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
278             final ActorSelection selection = actorSystem.actorSelection(address);
279
280             futures.add(FutureConverters.toJava(
281                     actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
282                     .toCompletableFuture());
283         }
284
285         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
286                 futures.toArray(new CompletableFuture[futures.size()]));
287
288         final ActorRef respondTo = getSender();
289
290         combinedFuture
291                 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
292                 .exceptionally(e -> {
293                     respondTo.tell(new Status.Failure(null), self());
294                     return null;
295                 });
296
297     }
298
299     private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
300         LOG.debug("Received NotifyProducerRemoved: {}", message);
301
302         final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
303         if (registration == null) {
304             LOG.warn("The notification contained a path on which no producer is registered, throwing away");
305             getSender().tell(new Status.Success(null), noSender());
306             return;
307         }
308
309         try {
310             registration.close();
311             getSender().tell(new Status.Success(null), noSender());
312         } catch (final DOMDataTreeProducerException e) {
313             LOG.error("Unable to close producer", e);
314             getSender().tell(new Status.Failure(e), noSender());
315         }
316     }
317
318     @SuppressWarnings("checkstyle:IllegalCatch")
319     private void onLookupPrefixShard(final LookupPrefixShard message) {
320         LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
321
322         final DOMDataTreeIdentifier prefix = message.getPrefix();
323
324         final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
325                         ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
326
327         // schedule a notification task for the reply
328         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
329                 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
330                         context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
331     }
332
333     private void onPrefixShardCreated(final PrefixShardCreated message) {
334         LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
335
336         final PrefixShardConfiguration config = message.getConfiguration();
337
338         shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
339     }
340
341     private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
342         LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
343
344         final ShardRemovalLookupTask removalTask =
345                 new ShardRemovalLookupTask(actorSystem, getSender(),
346                         actorContext, message.getPrefix(), lookupTaskMaxRetries);
347
348         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
349     }
350
351     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
352         LOG.debug("Received PrefixShardRemoved: {}", message);
353
354         shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
355     }
356
357     private void onStartConfigShardLookup(final StartConfigShardLookup message) {
358         LOG.debug("Received StartConfigShardLookup: {}", message);
359
360         final ActorContext context =
361                 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
362                         ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
363
364         // schedule a notification task for the reply
365         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
366                 new ConfigShardLookupTask(
367                         actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries),
368                 actorSystem.dispatcher());
369     }
370
371     private static MemberName memberToName(final Member member) {
372         return MemberName.forName(member.roles().iterator().next());
373     }
374
375     private class ActorProducerRegistration {
376
377         private final DOMDataTreeProducer producer;
378         private final Collection<DOMDataTreeIdentifier> subtrees;
379
380         ActorProducerRegistration(final DOMDataTreeProducer producer,
381                                   final Collection<DOMDataTreeIdentifier> subtrees) {
382             this.producer = producer;
383             this.subtrees = subtrees;
384         }
385
386         void close() throws DOMDataTreeProducerException {
387             producer.close();
388             subtrees.forEach(idToProducer::remove);
389         }
390     }
391
392     private static class ShardFrontendRegistration extends
393             AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
394
395         private final ActorRef clientActor;
396         private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
397
398         ShardFrontendRegistration(final ActorRef clientActor,
399                                   final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
400             super(shardRegistration);
401             this.clientActor = clientActor;
402             this.shardRegistration = shardRegistration;
403         }
404
405         @Override
406         protected void removeRegistration() {
407             shardRegistration.close();
408             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
409         }
410     }
411
412     /**
413      * Handles the lookup step of cds shard creation once the configuration is updated.
414      */
415     private static class ShardCreationLookupTask extends LookupTask {
416
417         private final ActorSystem system;
418         private final ActorRef replyTo;
419         private final ClusterWrapper clusterWrapper;
420         private final ActorContext context;
421         private final DistributedShardedDOMDataTree shardingService;
422         private final DOMDataTreeIdentifier toLookup;
423         private final int lookupMaxRetries;
424
425         ShardCreationLookupTask(final ActorSystem system,
426                                 final ActorRef replyTo,
427                                 final ClusterWrapper clusterWrapper,
428                                 final ActorContext context,
429                                 final DistributedShardedDOMDataTree shardingService,
430                                 final DOMDataTreeIdentifier toLookup,
431                                 final int lookupMaxRetries) {
432             super(replyTo, lookupMaxRetries);
433             this.system = system;
434             this.replyTo = replyTo;
435             this.clusterWrapper = clusterWrapper;
436             this.context = context;
437             this.shardingService = shardingService;
438             this.toLookup = toLookup;
439             this.lookupMaxRetries = lookupMaxRetries;
440         }
441
442         @Override
443         public void run() {
444             final Future<ActorRef> localShardFuture =
445                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
446
447             localShardFuture.onComplete(new OnComplete<ActorRef>() {
448                 @Override
449                 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
450                     if (throwable != null) {
451                         tryReschedule(throwable);
452                     } else {
453                         LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
454
455                         system.scheduler().scheduleOnce(
456                                 SHARD_LOOKUP_TASK_INTERVAL,
457                                 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
458                                         shardingService, toLookup, lookupMaxRetries),
459                                 system.dispatcher());
460                     }
461                 }
462             }, system.dispatcher());
463         }
464
465         @Override
466         void reschedule(int retries) {
467             LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
468             system.scheduler().scheduleOnce(
469                     SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
470         }
471     }
472
473     /**
474      * Handles the readiness step by waiting for a leader of the created shard.
475      */
476     private static class ShardLeaderLookupTask extends LookupTask {
477
478         private final ActorSystem system;
479         private final ActorRef replyTo;
480         private final ActorContext context;
481         private final ClusterWrapper clusterWrapper;
482         private final ActorRef shard;
483         private final DistributedShardedDOMDataTree shardingService;
484         private final DOMDataTreeIdentifier toLookup;
485         private final int lookupMaxRetries;
486
487         ShardLeaderLookupTask(final ActorSystem system,
488                               final ActorRef replyTo,
489                               final ActorContext context,
490                               final ClusterWrapper clusterWrapper,
491                               final ActorRef shard,
492                               final DistributedShardedDOMDataTree shardingService,
493                               final DOMDataTreeIdentifier toLookup,
494                               final int lookupMaxRetries) {
495             super(replyTo, lookupMaxRetries);
496             this.system = system;
497             this.replyTo = replyTo;
498             this.context = context;
499             this.clusterWrapper = clusterWrapper;
500             this.shard = shard;
501             this.shardingService = shardingService;
502             this.toLookup = toLookup;
503             this.lookupMaxRetries = lookupMaxRetries;
504         }
505
506         @Override
507         public void run() {
508
509             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
510
511             ask.onComplete(new OnComplete<Object>() {
512                 @Override
513                 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
514                     if (throwable != null) {
515                         tryReschedule(throwable);
516                     } else {
517                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
518                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
519                         if (leaderActor.isPresent()) {
520                             // leader is found, backend seems ready, check if the frontend is ready
521                             LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
522                                     clusterWrapper.getCurrentMemberName(), toLookup);
523                             system.scheduler().scheduleOnce(
524                                     SHARD_LOOKUP_TASK_INTERVAL,
525                                     new FrontendLookupTask(
526                                             system, replyTo, shardingService, toLookup, lookupMaxRetries),
527                                     system.dispatcher());
528                         } else {
529                             tryReschedule(null);
530                         }
531                     }
532                 }
533             }, system.dispatcher());
534
535         }
536
537         @Override
538         void reschedule(int retries) {
539             LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
540                     clusterWrapper.getCurrentMemberName(), toLookup, retries);
541             system.scheduler().scheduleOnce(
542                     SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
543         }
544     }
545
546     /**
547      * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
548      * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
549      * case they race), the future for the cds shard creation is completed and the shard is ready for use.
550      */
551     private static final class FrontendLookupTask extends LookupTask {
552
553         private final ActorSystem system;
554         private final ActorRef replyTo;
555         private final DistributedShardedDOMDataTree shardingService;
556         private final DOMDataTreeIdentifier toLookup;
557
558         FrontendLookupTask(final ActorSystem system,
559                            final ActorRef replyTo,
560                            final DistributedShardedDOMDataTree shardingService,
561                            final DOMDataTreeIdentifier toLookup,
562                            final int lookupMaxRetries) {
563             super(replyTo, lookupMaxRetries);
564             this.system = system;
565             this.replyTo = replyTo;
566             this.shardingService = shardingService;
567             this.toLookup = toLookup;
568         }
569
570         @Override
571         public void run() {
572             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
573                     shardingService.lookupShardFrontend(toLookup);
574
575             if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
576                 replyTo.tell(new Success(null), noSender());
577             } else {
578                 tryReschedule(null);
579             }
580         }
581
582         private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
583                                           final DOMDataTreeIdentifier prefix) {
584             if (entry == null) {
585                 return false;
586             }
587
588             if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
589                 return true;
590             }
591
592             if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
593                 return true;
594             }
595
596             return false;
597         }
598
599         @Override
600         void reschedule(int retries) {
601             LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
602             system.scheduler().scheduleOnce(
603                     SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
604         }
605     }
606
607     /**
608      * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
609      * configuration.
610      */
611     private static class ShardRemovalLookupTask extends LookupTask {
612
613         private final ActorSystem system;
614         private final ActorRef replyTo;
615         private final ActorContext context;
616         private final DOMDataTreeIdentifier toLookup;
617
618         ShardRemovalLookupTask(final ActorSystem system,
619                                final ActorRef replyTo,
620                                final ActorContext context,
621                                final DOMDataTreeIdentifier toLookup,
622                                final int lookupMaxRetries) {
623             super(replyTo, lookupMaxRetries);
624             this.system = system;
625             this.replyTo = replyTo;
626             this.context = context;
627             this.toLookup = toLookup;
628         }
629
630         @Override
631         public void run() {
632             final Future<ActorRef> localShardFuture =
633                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
634
635             localShardFuture.onComplete(new OnComplete<ActorRef>() {
636                 @Override
637                 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
638                     if (throwable != null) {
639                         //TODO Shouldn't we check why findLocalShard failed?
640                         LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
641                                 toLookup);
642                         replyTo.tell(new Success(null), noSender());
643                     } else {
644                         tryReschedule(null);
645                     }
646                 }
647             }, system.dispatcher());
648         }
649
650         @Override
651         void reschedule(int retries) {
652             LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
653                     toLookup, retries);
654             system.scheduler().scheduleOnce(
655                     SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
656         }
657     }
658
659     /**
660      * Task for handling the lookup of the backend for the configuration shard.
661      */
662     private static class ConfigShardLookupTask extends LookupTask {
663
664         private final ActorSystem system;
665         private final ActorRef replyTo;
666         private final ActorContext context;
667         private final ClusterWrapper clusterWrapper;
668         private final int lookupTaskMaxRetries;
669
670         ConfigShardLookupTask(final ActorSystem system,
671                               final ActorRef replyTo,
672                               final ActorContext context,
673                               final ClusterWrapper clusterWrapper,
674                               final StartConfigShardLookup message,
675                               final int lookupMaxRetries) {
676             super(replyTo, lookupMaxRetries);
677             this.system = system;
678             this.replyTo = replyTo;
679             this.context = context;
680             this.clusterWrapper = clusterWrapper;
681             this.lookupTaskMaxRetries = lookupMaxRetries;
682         }
683
684         @Override
685         void reschedule(int retries) {
686             LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
687             system.scheduler().scheduleOnce(
688                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
689         }
690
691         @Override
692         public void run() {
693             final Optional<ActorRef> localShard =
694                     context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
695
696             if (!localShard.isPresent()) {
697                 tryReschedule(null);
698             } else {
699                 LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup..");
700                 system.scheduler().scheduleOnce(
701                         SHARD_LOOKUP_TASK_INTERVAL,
702                         new ConfigShardReadinessTask(
703                                 system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries),
704                         system.dispatcher());
705             }
706         }
707     }
708
709     /**
710      * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
711      */
712     private static class ConfigShardReadinessTask extends LookupTask {
713
714         private final ActorSystem system;
715         private final ActorRef replyTo;
716         private final ActorContext context;
717         private final ClusterWrapper clusterWrapper;
718         private final ActorRef shard;
719
720         ConfigShardReadinessTask(final ActorSystem system,
721                                  final ActorRef replyTo,
722                                  final ActorContext context,
723                                  final ClusterWrapper clusterWrapper,
724                                  final ActorRef shard,
725                                  final int lookupMaxRetries) {
726             super(replyTo, lookupMaxRetries);
727             this.system = system;
728             this.replyTo = replyTo;
729             this.context = context;
730             this.clusterWrapper = clusterWrapper;
731             this.shard = shard;
732         }
733
734         @Override
735         void reschedule(int retries) {
736             LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
737                     clusterWrapper.getCurrentMemberName(), retries);
738             system.scheduler().scheduleOnce(
739                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
740         }
741
742         @Override
743         public void run() {
744             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
745
746             ask.onComplete(new OnComplete<Object>() {
747                 @Override
748                 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
749                     if (throwable != null) {
750                         tryReschedule(throwable);
751                     } else {
752                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
753                         final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
754                         if (leaderActor.isPresent()) {
755                             // leader is found, backend seems ready, check if the frontend is ready
756                             LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
757                                     clusterWrapper.getCurrentMemberName());
758                             replyTo.tell(new Status.Success(null), noSender());
759                         } else {
760                             tryReschedule(null);
761                         }
762                     }
763                 }
764             }, system.dispatcher());
765         }
766     }
767
768     public static class ShardedDataTreeActorCreator {
769
770         private DistributedShardedDOMDataTree shardingService;
771         private AbstractDataStore distributedConfigDatastore;
772         private AbstractDataStore distributedOperDatastore;
773         private ActorSystem actorSystem;
774         private ClusterWrapper cluster;
775         private int maxRetries;
776
777         public DistributedShardedDOMDataTree getShardingService() {
778             return shardingService;
779         }
780
781         public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
782             this.shardingService = shardingService;
783             return this;
784         }
785
786         public ActorSystem getActorSystem() {
787             return actorSystem;
788         }
789
790         public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
791             this.actorSystem = actorSystem;
792             return this;
793         }
794
795         public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
796             this.cluster = cluster;
797             return this;
798         }
799
800         public ClusterWrapper getClusterWrapper() {
801             return cluster;
802         }
803
804         public AbstractDataStore getDistributedConfigDatastore() {
805             return distributedConfigDatastore;
806         }
807
808         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
809                 final AbstractDataStore distributedConfigDatastore) {
810             this.distributedConfigDatastore = distributedConfigDatastore;
811             return this;
812         }
813
814         public AbstractDataStore getDistributedOperDatastore() {
815             return distributedOperDatastore;
816         }
817
818         public ShardedDataTreeActorCreator setDistributedOperDatastore(
819                 final AbstractDataStore distributedOperDatastore) {
820             this.distributedOperDatastore = distributedOperDatastore;
821             return this;
822         }
823
824         public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) {
825             this.maxRetries = maxRetries;
826             return this;
827         }
828
829         public int getLookupTaskMaxRetries() {
830             return maxRetries;
831         }
832
833         private void verify() {
834             Preconditions.checkNotNull(shardingService);
835             Preconditions.checkNotNull(actorSystem);
836             Preconditions.checkNotNull(cluster);
837             Preconditions.checkNotNull(distributedConfigDatastore);
838             Preconditions.checkNotNull(distributedOperDatastore);
839         }
840
841         public Props props() {
842             verify();
843             return Props.create(ShardedDataTreeActor.class, this);
844         }
845     }
846 }