Deprecate DOMDataTreeProducer-related classes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.sharding;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Status;
18 import akka.actor.Status.Success;
19 import akka.cluster.ClusterEvent;
20 import akka.cluster.ClusterEvent.MemberExited;
21 import akka.cluster.ClusterEvent.MemberRemoved;
22 import akka.cluster.ClusterEvent.MemberUp;
23 import akka.cluster.ClusterEvent.MemberWeaklyUp;
24 import akka.cluster.ClusterEvent.ReachableMember;
25 import akka.cluster.ClusterEvent.UnreachableMember;
26 import akka.cluster.Member;
27 import akka.dispatch.OnComplete;
28 import akka.pattern.Patterns;
29 import akka.util.Timeout;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Optional;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.TimeUnit;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
41 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
42 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
43 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
44 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
47 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
48 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
49 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
50 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
51 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
52 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
53 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
54 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
55 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
56 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
57 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
62 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
63 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
64 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.compat.java8.FutureConverters;
70 import scala.concurrent.Future;
71 import scala.concurrent.duration.FiniteDuration;
72
73 /**
74  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
75  * nodes of newly open producers/shards on the local node.
76  */
77 @Deprecated(forRemoval = true)
78 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
79
80     private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
81
82     private static final String PERSISTENCE_ID = "sharding-service-actor";
83     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
84
85     static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
86
87     private final DistributedShardedDOMDataTree shardingService;
88     private final ActorSystem actorSystem;
89     private final ClusterWrapper clusterWrapper;
90     // helper actorContext used only for static calls to executeAsync etc
91     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
92     private final ActorUtils actorUtils;
93     private final ShardingServiceAddressResolver resolver;
94     private final DistributedDataStoreInterface distributedConfigDatastore;
95     private final DistributedDataStoreInterface distributedOperDatastore;
96     private final int lookupTaskMaxRetries;
97
98     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
99
100     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
101         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
102
103         shardingService = builder.getShardingService();
104         actorSystem = builder.getActorSystem();
105         clusterWrapper = builder.getClusterWrapper();
106         distributedConfigDatastore = builder.getDistributedConfigDatastore();
107         distributedOperDatastore = builder.getDistributedOperDatastore();
108         lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
109         actorUtils = distributedConfigDatastore.getActorUtils();
110         resolver = new ShardingServiceAddressResolver(
111                 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
112
113         clusterWrapper.subscribeToMemberEvents(self());
114     }
115
116     @Override
117     public void preStart() {
118     }
119
120     @Override
121     protected void handleRecover(final Object message) {
122         LOG.debug("Received a recover message {}", message);
123     }
124
125     @Override
126     protected void handleCommand(final Object message) {
127         LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
128         if (message instanceof ClusterEvent.MemberUp) {
129             memberUp((ClusterEvent.MemberUp) message);
130         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
131             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
132         } else if (message instanceof ClusterEvent.MemberExited) {
133             memberExited((ClusterEvent.MemberExited) message);
134         } else if (message instanceof ClusterEvent.MemberRemoved) {
135             memberRemoved((ClusterEvent.MemberRemoved) message);
136         } else if (message instanceof ClusterEvent.UnreachableMember) {
137             memberUnreachable((ClusterEvent.UnreachableMember) message);
138         } else if (message instanceof ClusterEvent.ReachableMember) {
139             memberReachable((ClusterEvent.ReachableMember) message);
140         } else if (message instanceof ProducerCreated) {
141             onProducerCreated((ProducerCreated) message);
142         } else if (message instanceof NotifyProducerCreated) {
143             onNotifyProducerCreated((NotifyProducerCreated) message);
144         } else if (message instanceof ProducerRemoved) {
145             onProducerRemoved((ProducerRemoved) message);
146         } else if (message instanceof NotifyProducerRemoved) {
147             onNotifyProducerRemoved((NotifyProducerRemoved) message);
148         } else if (message instanceof PrefixShardCreated) {
149             onPrefixShardCreated((PrefixShardCreated) message);
150         } else if (message instanceof LookupPrefixShard) {
151             onLookupPrefixShard((LookupPrefixShard) message);
152         } else if (message instanceof PrefixShardRemovalLookup) {
153             onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
154         } else if (message instanceof PrefixShardRemoved) {
155             onPrefixShardRemoved((PrefixShardRemoved) message);
156         } else if (message instanceof StartConfigShardLookup) {
157             onStartConfigShardLookup((StartConfigShardLookup) message);
158         }
159     }
160
161     @Override
162     public String persistenceId() {
163         return PERSISTENCE_ID;
164     }
165
166     private void memberUp(final MemberUp message) {
167         final MemberName memberName = memberToName(message.member());
168
169         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
170                 message.member().address());
171
172         resolver.addPeerAddress(memberName, message.member().address());
173     }
174
175     private void memberWeaklyUp(final MemberWeaklyUp message) {
176         final MemberName memberName = memberToName(message.member());
177
178         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
179                 message.member().address());
180
181         resolver.addPeerAddress(memberName, message.member().address());
182     }
183
184     private void memberExited(final MemberExited message) {
185         final MemberName memberName = memberToName(message.member());
186
187         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
188                 message.member().address());
189
190         resolver.removePeerAddress(memberName);
191     }
192
193     private void memberRemoved(final MemberRemoved message) {
194         final MemberName memberName = memberToName(message.member());
195
196         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
197                 message.member().address());
198
199         resolver.removePeerAddress(memberName);
200     }
201
202     private void memberUnreachable(final UnreachableMember message) {
203         final MemberName memberName = memberToName(message.member());
204         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
205
206         resolver.removePeerAddress(memberName);
207     }
208
209     private void memberReachable(final ReachableMember message) {
210         final MemberName memberName = memberToName(message.member());
211         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
212
213         resolver.addPeerAddress(memberName, message.member().address());
214     }
215
216     private void onProducerCreated(final ProducerCreated message) {
217         LOG.debug("Received ProducerCreated: {}", message);
218
219         // fastpath if we have no peers
220         if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
221             getSender().tell(new Status.Success(null), ActorRef.noSender());
222         }
223
224         final ActorRef sender = getSender();
225         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
226
227         final List<CompletableFuture<Object>> futures = new ArrayList<>();
228
229         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
230             final ActorSelection actorSelection = actorSystem.actorSelection(address);
231             futures.add(
232                     FutureConverters.toJava(
233                             actorUtils.executeOperationAsync(
234                                     actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
235                     .toCompletableFuture());
236         }
237
238         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
239                 futures.toArray(new CompletableFuture[futures.size()]));
240
241         combinedFuture
242                 .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender()))
243                 .exceptionally(throwable -> {
244                     sender.tell(new Status.Failure(throwable), self());
245                     return null;
246                 });
247     }
248
249     private void onNotifyProducerCreated(final NotifyProducerCreated message) {
250         LOG.debug("Received NotifyProducerCreated: {}", message);
251
252         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
253
254         try {
255             final ActorProducerRegistration registration =
256                     new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
257             subtrees.forEach(id -> idToProducer.put(id, registration));
258             sender().tell(new Status.Success(null), self());
259         } catch (final IllegalArgumentException e) {
260             sender().tell(new Status.Failure(e), getSelf());
261         }
262     }
263
264     private void onProducerRemoved(final ProducerRemoved message) {
265         LOG.debug("Received ProducerRemoved: {}", message);
266
267         final List<CompletableFuture<Object>> futures = new ArrayList<>();
268
269         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
270             final ActorSelection selection = actorSystem.actorSelection(address);
271
272             futures.add(FutureConverters.toJava(
273                     actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
274                     .toCompletableFuture());
275         }
276
277         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
278                 futures.toArray(new CompletableFuture[futures.size()]));
279
280         final ActorRef respondTo = getSender();
281
282         combinedFuture
283                 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
284                 .exceptionally(e -> {
285                     respondTo.tell(new Status.Failure(null), self());
286                     return null;
287                 });
288
289     }
290
291     private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
292         LOG.debug("Received NotifyProducerRemoved: {}", message);
293
294         final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
295         if (registration == null) {
296             LOG.warn("The notification contained a path on which no producer is registered, throwing away");
297             getSender().tell(new Status.Success(null), ActorRef.noSender());
298             return;
299         }
300
301         try {
302             registration.close();
303             getSender().tell(new Status.Success(null), ActorRef.noSender());
304         } catch (final DOMDataTreeProducerException e) {
305             LOG.error("Unable to close producer", e);
306             getSender().tell(new Status.Failure(e), ActorRef.noSender());
307         }
308     }
309
310     @SuppressWarnings("checkstyle:IllegalCatch")
311     private void onLookupPrefixShard(final LookupPrefixShard message) {
312         LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
313
314         final DOMDataTreeIdentifier prefix = message.getPrefix();
315
316         final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
317                         ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
318
319         // schedule a notification task for the reply
320         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
321                 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
322                         utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
323     }
324
325     private void onPrefixShardCreated(final PrefixShardCreated message) {
326         LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
327
328         final PrefixShardConfiguration config = message.getConfiguration();
329
330         shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
331     }
332
333     private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
334         LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
335
336         final ShardRemovalLookupTask removalTask =
337                 new ShardRemovalLookupTask(actorSystem, getSender(),
338                         actorUtils, message.getPrefix(), lookupTaskMaxRetries);
339
340         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
341     }
342
343     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
344         LOG.debug("Received PrefixShardRemoved: {}", message);
345
346         shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
347     }
348
349     private void onStartConfigShardLookup(final StartConfigShardLookup message) {
350         LOG.debug("Received StartConfigShardLookup: {}", message);
351
352         final ActorUtils context =
353                 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
354                         ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
355
356         // schedule a notification task for the reply
357         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
358                 new ConfigShardLookupTask(
359                         actorSystem, getSender(), context, message, lookupTaskMaxRetries),
360                 actorSystem.dispatcher());
361     }
362
363     private static MemberName memberToName(final Member member) {
364         return MemberName.forName(member.roles().iterator().next());
365     }
366
367     private class ActorProducerRegistration {
368
369         private final DOMDataTreeProducer producer;
370         private final Collection<DOMDataTreeIdentifier> subtrees;
371
372         ActorProducerRegistration(final DOMDataTreeProducer producer,
373                                   final Collection<DOMDataTreeIdentifier> subtrees) {
374             this.producer = producer;
375             this.subtrees = subtrees;
376         }
377
378         void close() throws DOMDataTreeProducerException {
379             producer.close();
380             subtrees.forEach(idToProducer::remove);
381         }
382     }
383
384     private static class ShardFrontendRegistration extends
385             AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
386
387         private final ActorRef clientActor;
388         private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
389
390         ShardFrontendRegistration(final ActorRef clientActor,
391                                   final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
392             super(shardRegistration);
393             this.clientActor = clientActor;
394             this.shardRegistration = shardRegistration;
395         }
396
397         @Override
398         protected void removeRegistration() {
399             shardRegistration.close();
400             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
401         }
402     }
403
404     /**
405      * Handles the lookup step of cds shard creation once the configuration is updated.
406      */
407     private static class ShardCreationLookupTask extends LookupTask {
408
409         private final ActorSystem system;
410         private final ActorRef replyTo;
411         private final ClusterWrapper clusterWrapper;
412         private final ActorUtils context;
413         private final DistributedShardedDOMDataTree shardingService;
414         private final DOMDataTreeIdentifier toLookup;
415         private final int lookupMaxRetries;
416
417         ShardCreationLookupTask(final ActorSystem system,
418                                 final ActorRef replyTo,
419                                 final ClusterWrapper clusterWrapper,
420                                 final ActorUtils context,
421                                 final DistributedShardedDOMDataTree shardingService,
422                                 final DOMDataTreeIdentifier toLookup,
423                                 final int lookupMaxRetries) {
424             super(replyTo, lookupMaxRetries);
425             this.system = system;
426             this.replyTo = replyTo;
427             this.clusterWrapper = clusterWrapper;
428             this.context = context;
429             this.shardingService = shardingService;
430             this.toLookup = toLookup;
431             this.lookupMaxRetries = lookupMaxRetries;
432         }
433
434         @Override
435         public void run() {
436             final Future<ActorRef> localShardFuture =
437                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
438
439             localShardFuture.onComplete(new OnComplete<ActorRef>() {
440                 @Override
441                 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
442                     if (throwable != null) {
443                         tryReschedule(throwable);
444                     } else {
445                         LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
446
447                         system.scheduler().scheduleOnce(
448                                 SHARD_LOOKUP_TASK_INTERVAL,
449                                 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
450                                         shardingService, toLookup, lookupMaxRetries),
451                                 system.dispatcher());
452                     }
453                 }
454             }, system.dispatcher());
455         }
456
457         @Override
458         void reschedule(final int retries) {
459             LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
460             system.scheduler().scheduleOnce(
461                     SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
462         }
463     }
464
465     /**
466      * Handles the readiness step by waiting for a leader of the created shard.
467      */
468     private static class ShardLeaderLookupTask extends LookupTask {
469
470         private final ActorSystem system;
471         private final ActorRef replyTo;
472         private final ActorUtils context;
473         private final ClusterWrapper clusterWrapper;
474         private final ActorRef shard;
475         private final DistributedShardedDOMDataTree shardingService;
476         private final DOMDataTreeIdentifier toLookup;
477         private final int lookupMaxRetries;
478
479         ShardLeaderLookupTask(final ActorSystem system,
480                               final ActorRef replyTo,
481                               final ActorUtils context,
482                               final ClusterWrapper clusterWrapper,
483                               final ActorRef shard,
484                               final DistributedShardedDOMDataTree shardingService,
485                               final DOMDataTreeIdentifier toLookup,
486                               final int lookupMaxRetries) {
487             super(replyTo, lookupMaxRetries);
488             this.system = system;
489             this.replyTo = replyTo;
490             this.context = context;
491             this.clusterWrapper = clusterWrapper;
492             this.shard = shard;
493             this.shardingService = shardingService;
494             this.toLookup = toLookup;
495             this.lookupMaxRetries = lookupMaxRetries;
496         }
497
498         @Override
499         public void run() {
500
501             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
502
503             ask.onComplete(new OnComplete<>() {
504                 @Override
505                 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
506                     if (throwable != null) {
507                         tryReschedule(throwable);
508                     } else {
509                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
510                         final Optional<String> leaderActor = findLeader.getLeaderActor();
511                         if (leaderActor.isPresent()) {
512                             // leader is found, backend seems ready, check if the frontend is ready
513                             LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
514                                     clusterWrapper.getCurrentMemberName(), toLookup);
515                             system.scheduler().scheduleOnce(
516                                     SHARD_LOOKUP_TASK_INTERVAL,
517                                     new FrontendLookupTask(
518                                             system, replyTo, shardingService, toLookup, lookupMaxRetries),
519                                     system.dispatcher());
520                         } else {
521                             tryReschedule(null);
522                         }
523                     }
524                 }
525             }, system.dispatcher());
526
527         }
528
529         @Override
530         void reschedule(final int retries) {
531             LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
532                     clusterWrapper.getCurrentMemberName(), toLookup, retries);
533             system.scheduler().scheduleOnce(
534                     SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
535         }
536     }
537
538     /**
539      * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
540      * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
541      * case they race), the future for the cds shard creation is completed and the shard is ready for use.
542      */
543     private static final class FrontendLookupTask extends LookupTask {
544
545         private final ActorSystem system;
546         private final ActorRef replyTo;
547         private final DistributedShardedDOMDataTree shardingService;
548         private final DOMDataTreeIdentifier toLookup;
549
550         FrontendLookupTask(final ActorSystem system,
551                            final ActorRef replyTo,
552                            final DistributedShardedDOMDataTree shardingService,
553                            final DOMDataTreeIdentifier toLookup,
554                            final int lookupMaxRetries) {
555             super(replyTo, lookupMaxRetries);
556             this.system = system;
557             this.replyTo = replyTo;
558             this.shardingService = shardingService;
559             this.toLookup = toLookup;
560         }
561
562         @Override
563         public void run() {
564             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
565                     shardingService.lookupShardFrontend(toLookup);
566
567             if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
568                 replyTo.tell(new Success(null), ActorRef.noSender());
569             } else {
570                 tryReschedule(null);
571             }
572         }
573
574         private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
575                                           final DOMDataTreeIdentifier prefix) {
576             if (entry == null) {
577                 return false;
578             }
579
580             if (YangInstanceIdentifier.empty().equals(prefix.getRootIdentifier())) {
581                 return true;
582             }
583
584             if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
585                 return true;
586             }
587
588             return false;
589         }
590
591         @Override
592         void reschedule(final int retries) {
593             LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
594             system.scheduler().scheduleOnce(
595                     SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
596         }
597     }
598
599     /**
600      * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
601      * configuration.
602      */
603     private static class ShardRemovalLookupTask extends LookupTask {
604
605         private final ActorSystem system;
606         private final ActorRef replyTo;
607         private final ActorUtils context;
608         private final DOMDataTreeIdentifier toLookup;
609
610         ShardRemovalLookupTask(final ActorSystem system,
611                                final ActorRef replyTo,
612                                final ActorUtils context,
613                                final DOMDataTreeIdentifier toLookup,
614                                final int lookupMaxRetries) {
615             super(replyTo, lookupMaxRetries);
616             this.system = system;
617             this.replyTo = replyTo;
618             this.context = context;
619             this.toLookup = toLookup;
620         }
621
622         @Override
623         public void run() {
624             final Future<ActorRef> localShardFuture =
625                     context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
626
627             localShardFuture.onComplete(new OnComplete<ActorRef>() {
628                 @Override
629                 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
630                     if (throwable != null) {
631                         //TODO Shouldn't we check why findLocalShard failed?
632                         LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
633                                 toLookup);
634                         replyTo.tell(new Success(null), ActorRef.noSender());
635                     } else {
636                         tryReschedule(null);
637                     }
638                 }
639             }, system.dispatcher());
640         }
641
642         @Override
643         void reschedule(final int retries) {
644             LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
645                     toLookup, retries);
646             system.scheduler().scheduleOnce(
647                     SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
648         }
649     }
650
651     /**
652      * Task for handling the lookup of the backend for the configuration shard.
653      */
654     private static class ConfigShardLookupTask extends LookupTask {
655
656         private final ActorSystem system;
657         private final ActorRef replyTo;
658         private final ActorUtils context;
659
660         ConfigShardLookupTask(final ActorSystem system,
661                               final ActorRef replyTo,
662                               final ActorUtils context,
663                               final StartConfigShardLookup message,
664                               final int lookupMaxRetries) {
665             super(replyTo, lookupMaxRetries);
666             this.system = system;
667             this.replyTo = replyTo;
668             this.context = context;
669         }
670
671         @Override
672         void reschedule(final int retries) {
673             LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
674             system.scheduler().scheduleOnce(
675                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
676         }
677
678         @Override
679         public void run() {
680             final Optional<ActorRef> localShard =
681                     context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
682
683             if (!localShard.isPresent()) {
684                 tryReschedule(null);
685             } else {
686                 LOG.debug("Local backend for prefix configuration shard lookup successful");
687                 replyTo.tell(new Status.Success(null), ActorRef.noSender());
688             }
689         }
690     }
691
692     /**
693      * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
694      */
695     private static class ConfigShardReadinessTask extends LookupTask {
696
697         private final ActorSystem system;
698         private final ActorRef replyTo;
699         private final ActorUtils context;
700         private final ClusterWrapper clusterWrapper;
701         private final ActorRef shard;
702
703         ConfigShardReadinessTask(final ActorSystem system,
704                                  final ActorRef replyTo,
705                                  final ActorUtils context,
706                                  final ClusterWrapper clusterWrapper,
707                                  final ActorRef shard,
708                                  final int lookupMaxRetries) {
709             super(replyTo, lookupMaxRetries);
710             this.system = system;
711             this.replyTo = replyTo;
712             this.context = context;
713             this.clusterWrapper = clusterWrapper;
714             this.shard = shard;
715         }
716
717         @Override
718         void reschedule(final int retries) {
719             LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
720                     clusterWrapper.getCurrentMemberName(), retries);
721             system.scheduler().scheduleOnce(
722                     SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
723         }
724
725         @Override
726         public void run() {
727             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
728
729             ask.onComplete(new OnComplete<>() {
730                 @Override
731                 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
732                     if (throwable != null) {
733                         tryReschedule(throwable);
734                     } else {
735                         final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
736                         final Optional<String> leaderActor = findLeader.getLeaderActor();
737                         if (leaderActor.isPresent()) {
738                             // leader is found, backend seems ready, check if the frontend is ready
739                             LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
740                                     clusterWrapper.getCurrentMemberName());
741                             replyTo.tell(new Status.Success(null), ActorRef.noSender());
742                         } else {
743                             tryReschedule(null);
744                         }
745                     }
746                 }
747             }, system.dispatcher());
748         }
749     }
750
751     public static class ShardedDataTreeActorCreator {
752
753         private DistributedShardedDOMDataTree shardingService;
754         private DistributedDataStoreInterface distributedConfigDatastore;
755         private DistributedDataStoreInterface distributedOperDatastore;
756         private ActorSystem actorSystem;
757         private ClusterWrapper cluster;
758         private int maxRetries;
759
760         public DistributedShardedDOMDataTree getShardingService() {
761             return shardingService;
762         }
763
764         public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
765             this.shardingService = shardingService;
766             return this;
767         }
768
769         public ActorSystem getActorSystem() {
770             return actorSystem;
771         }
772
773         public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
774             this.actorSystem = actorSystem;
775             return this;
776         }
777
778         public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
779             this.cluster = clusterWrapper;
780             return this;
781         }
782
783         public ClusterWrapper getClusterWrapper() {
784             return cluster;
785         }
786
787         public DistributedDataStoreInterface getDistributedConfigDatastore() {
788             return distributedConfigDatastore;
789         }
790
791         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
792                 final DistributedDataStoreInterface distributedConfigDatastore) {
793             this.distributedConfigDatastore = distributedConfigDatastore;
794             return this;
795         }
796
797         public DistributedDataStoreInterface getDistributedOperDatastore() {
798             return distributedOperDatastore;
799         }
800
801         public ShardedDataTreeActorCreator setDistributedOperDatastore(
802                 final DistributedDataStoreInterface distributedOperDatastore) {
803             this.distributedOperDatastore = distributedOperDatastore;
804             return this;
805         }
806
807         public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
808             this.maxRetries = newMaxRetries;
809             return this;
810         }
811
812         public int getLookupTaskMaxRetries() {
813             return maxRetries;
814         }
815
816         private void verify() {
817             requireNonNull(shardingService);
818             requireNonNull(actorSystem);
819             requireNonNull(cluster);
820             requireNonNull(distributedConfigDatastore);
821             requireNonNull(distributedOperDatastore);
822         }
823
824         public Props props() {
825             verify();
826             return Props.create(ShardedDataTreeActor.class, this);
827         }
828     }
829 }