2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.Status.Success;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberExited;
23 import akka.cluster.ClusterEvent.MemberRemoved;
24 import akka.cluster.ClusterEvent.MemberUp;
25 import akka.cluster.ClusterEvent.MemberWeaklyUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import akka.pattern.Patterns;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Preconditions;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.TimeUnit;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
44 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
45 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
46 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
47 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
48 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
49 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
50 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
51 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
52 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
53 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
54 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
55 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
56 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
57 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
58 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
59 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
60 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
65 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
66 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
67 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
68 import org.opendaylight.yangtools.concepts.ListenerRegistration;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72 import scala.compat.java8.FutureConverters;
73 import scala.concurrent.Future;
74 import scala.concurrent.duration.FiniteDuration;
77 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
78 * nodes of newly open producers/shards on the local node.
80 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
82 private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
84 private static final String PERSISTENCE_ID = "sharding-service-actor";
85 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
87 static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
89 private final DistributedShardedDOMDataTree shardingService;
90 private final ActorSystem actorSystem;
91 private final ClusterWrapper clusterWrapper;
92 // helper actorContext used only for static calls to executeAsync etc
93 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
94 private final ActorContext actorContext;
95 private final ShardingServiceAddressResolver resolver;
96 private final DistributedDataStore distributedConfigDatastore;
97 private final DistributedDataStore distributedOperDatastore;
98 private final int lookupTaskMaxRetries;
100 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
101 private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
103 private final Cluster cluster;
105 private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
107 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
108 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
110 shardingService = builder.getShardingService();
111 actorSystem = builder.getActorSystem();
112 clusterWrapper = builder.getClusterWrapper();
113 distributedConfigDatastore = builder.getDistributedConfigDatastore();
114 distributedOperDatastore = builder.getDistributedOperDatastore();
115 lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
116 actorContext = distributedConfigDatastore.getActorContext();
117 resolver = new ShardingServiceAddressResolver(
118 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
120 clusterWrapper.subscribeToMemberEvents(self());
121 cluster = Cluster.get(actorSystem);
125 public void preStart() {
129 protected void handleRecover(final Object message) throws Exception {
130 LOG.debug("Received a recover message {}", message);
134 protected void handleCommand(final Object message) throws Exception {
135 LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
136 if (message instanceof ClusterEvent.MemberUp) {
137 memberUp((ClusterEvent.MemberUp) message);
138 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
139 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
140 } else if (message instanceof ClusterEvent.MemberExited) {
141 memberExited((ClusterEvent.MemberExited) message);
142 } else if (message instanceof ClusterEvent.MemberRemoved) {
143 memberRemoved((ClusterEvent.MemberRemoved) message);
144 } else if (message instanceof ClusterEvent.UnreachableMember) {
145 memberUnreachable((ClusterEvent.UnreachableMember) message);
146 } else if (message instanceof ClusterEvent.ReachableMember) {
147 memberReachable((ClusterEvent.ReachableMember) message);
148 } else if (message instanceof ProducerCreated) {
149 onProducerCreated((ProducerCreated) message);
150 } else if (message instanceof NotifyProducerCreated) {
151 onNotifyProducerCreated((NotifyProducerCreated) message);
152 } else if (message instanceof ProducerRemoved) {
153 onProducerRemoved((ProducerRemoved) message);
154 } else if (message instanceof NotifyProducerRemoved) {
155 onNotifyProducerRemoved((NotifyProducerRemoved) message);
156 } else if (message instanceof PrefixShardCreated) {
157 onPrefixShardCreated((PrefixShardCreated) message);
158 } else if (message instanceof LookupPrefixShard) {
159 onLookupPrefixShard((LookupPrefixShard) message);
160 } else if (message instanceof PrefixShardRemovalLookup) {
161 onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
162 } else if (message instanceof PrefixShardRemoved) {
163 onPrefixShardRemoved((PrefixShardRemoved) message);
164 } else if (message instanceof StartConfigShardLookup) {
165 onStartConfigShardLookup((StartConfigShardLookup) message);
170 public String persistenceId() {
171 return PERSISTENCE_ID;
174 private void memberUp(final MemberUp message) {
175 final MemberName memberName = memberToName(message.member());
177 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
178 message.member().address());
180 resolver.addPeerAddress(memberName, message.member().address());
183 private void memberWeaklyUp(final MemberWeaklyUp message) {
184 final MemberName memberName = memberToName(message.member());
186 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
187 message.member().address());
189 resolver.addPeerAddress(memberName, message.member().address());
192 private void memberExited(final MemberExited message) {
193 final MemberName memberName = memberToName(message.member());
195 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
196 message.member().address());
198 resolver.removePeerAddress(memberName);
201 private void memberRemoved(final MemberRemoved message) {
202 final MemberName memberName = memberToName(message.member());
204 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
205 message.member().address());
207 resolver.removePeerAddress(memberName);
210 private void memberUnreachable(final UnreachableMember message) {
211 final MemberName memberName = memberToName(message.member());
212 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
214 resolver.removePeerAddress(memberName);
217 private void memberReachable(final ReachableMember message) {
218 final MemberName memberName = memberToName(message.member());
219 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
221 resolver.addPeerAddress(memberName, message.member().address());
224 private void onProducerCreated(final ProducerCreated message) {
225 LOG.debug("Received ProducerCreated: {}", message);
227 // fastpath if we have no peers
228 if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
229 getSender().tell(new Status.Success(null), noSender());
232 final ActorRef sender = getSender();
233 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
235 final List<CompletableFuture<Object>> futures = new ArrayList<>();
237 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
238 final ActorSelection actorSelection = actorSystem.actorSelection(address);
240 FutureConverters.toJava(
241 actorContext.executeOperationAsync(
242 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
243 .toCompletableFuture());
246 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
247 futures.toArray(new CompletableFuture[futures.size()]));
249 combinedFuture.thenRun(() -> {
250 sender.tell(new Status.Success(null), noSender());
251 }).exceptionally(throwable -> {
252 sender.tell(new Status.Failure(throwable), self());
257 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
258 LOG.debug("Received NotifyProducerCreated: {}", message);
260 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
263 final ActorProducerRegistration registration =
264 new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
265 subtrees.forEach(id -> idToProducer.put(id, registration));
266 sender().tell(new Status.Success(null), self());
267 } catch (final IllegalArgumentException e) {
268 sender().tell(new Status.Failure(e), getSelf());
272 private void onProducerRemoved(final ProducerRemoved message) {
273 LOG.debug("Received ProducerRemoved: {}", message);
275 final List<CompletableFuture<Object>> futures = new ArrayList<>();
277 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
278 final ActorSelection selection = actorSystem.actorSelection(address);
280 futures.add(FutureConverters.toJava(
281 actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
282 .toCompletableFuture());
285 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
286 futures.toArray(new CompletableFuture[futures.size()]));
288 final ActorRef respondTo = getSender();
291 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
292 .exceptionally(e -> {
293 respondTo.tell(new Status.Failure(null), self());
299 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
300 LOG.debug("Received NotifyProducerRemoved: {}", message);
302 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
303 if (registration == null) {
304 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
305 getSender().tell(new Status.Success(null), noSender());
310 registration.close();
311 getSender().tell(new Status.Success(null), noSender());
312 } catch (final DOMDataTreeProducerException e) {
313 LOG.error("Unable to close producer", e);
314 getSender().tell(new Status.Failure(e), noSender());
318 @SuppressWarnings("checkstyle:IllegalCatch")
319 private void onLookupPrefixShard(final LookupPrefixShard message) {
320 LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
322 final DOMDataTreeIdentifier prefix = message.getPrefix();
324 final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
325 ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
327 // schedule a notification task for the reply
328 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
329 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
330 context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
333 private void onPrefixShardCreated(final PrefixShardCreated message) {
334 LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
336 final PrefixShardConfiguration config = message.getConfiguration();
338 shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
341 private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
342 LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
344 final ShardRemovalLookupTask removalTask =
345 new ShardRemovalLookupTask(actorSystem, getSender(),
346 actorContext, message.getPrefix(), lookupTaskMaxRetries);
348 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
351 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
352 LOG.debug("Received PrefixShardRemoved: {}", message);
354 shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
357 private void onStartConfigShardLookup(final StartConfigShardLookup message) {
358 LOG.debug("Received StartConfigShardLookup: {}", message);
360 final ActorContext context =
361 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
362 ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
364 // schedule a notification task for the reply
365 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
366 new ConfigShardLookupTask(
367 actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries),
368 actorSystem.dispatcher());
371 private static MemberName memberToName(final Member member) {
372 return MemberName.forName(member.roles().iterator().next());
375 private class ActorProducerRegistration {
377 private final DOMDataTreeProducer producer;
378 private final Collection<DOMDataTreeIdentifier> subtrees;
380 ActorProducerRegistration(final DOMDataTreeProducer producer,
381 final Collection<DOMDataTreeIdentifier> subtrees) {
382 this.producer = producer;
383 this.subtrees = subtrees;
386 void close() throws DOMDataTreeProducerException {
388 subtrees.forEach(idToProducer::remove);
392 private static class ShardFrontendRegistration extends
393 AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
395 private final ActorRef clientActor;
396 private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
398 ShardFrontendRegistration(final ActorRef clientActor,
399 final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
400 super(shardRegistration);
401 this.clientActor = clientActor;
402 this.shardRegistration = shardRegistration;
406 protected void removeRegistration() {
407 shardRegistration.close();
408 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
413 * Handles the lookup step of cds shard creation once the configuration is updated.
415 private static class ShardCreationLookupTask extends LookupTask {
417 private final ActorSystem system;
418 private final ActorRef replyTo;
419 private final ClusterWrapper clusterWrapper;
420 private final ActorContext context;
421 private final DistributedShardedDOMDataTree shardingService;
422 private final DOMDataTreeIdentifier toLookup;
423 private final int lookupMaxRetries;
425 ShardCreationLookupTask(final ActorSystem system,
426 final ActorRef replyTo,
427 final ClusterWrapper clusterWrapper,
428 final ActorContext context,
429 final DistributedShardedDOMDataTree shardingService,
430 final DOMDataTreeIdentifier toLookup,
431 final int lookupMaxRetries) {
432 super(replyTo, lookupMaxRetries);
433 this.system = system;
434 this.replyTo = replyTo;
435 this.clusterWrapper = clusterWrapper;
436 this.context = context;
437 this.shardingService = shardingService;
438 this.toLookup = toLookup;
439 this.lookupMaxRetries = lookupMaxRetries;
444 final Future<ActorRef> localShardFuture =
445 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
447 localShardFuture.onComplete(new OnComplete<ActorRef>() {
449 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
450 if (throwable != null) {
451 tryReschedule(throwable);
453 LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
455 system.scheduler().scheduleOnce(
456 SHARD_LOOKUP_TASK_INTERVAL,
457 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
458 shardingService, toLookup, lookupMaxRetries),
459 system.dispatcher());
462 }, system.dispatcher());
466 void reschedule(int retries) {
467 LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
468 system.scheduler().scheduleOnce(
469 SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
474 * Handles the readiness step by waiting for a leader of the created shard.
476 private static class ShardLeaderLookupTask extends LookupTask {
478 private final ActorSystem system;
479 private final ActorRef replyTo;
480 private final ActorContext context;
481 private final ClusterWrapper clusterWrapper;
482 private final ActorRef shard;
483 private final DistributedShardedDOMDataTree shardingService;
484 private final DOMDataTreeIdentifier toLookup;
485 private final int lookupMaxRetries;
487 ShardLeaderLookupTask(final ActorSystem system,
488 final ActorRef replyTo,
489 final ActorContext context,
490 final ClusterWrapper clusterWrapper,
491 final ActorRef shard,
492 final DistributedShardedDOMDataTree shardingService,
493 final DOMDataTreeIdentifier toLookup,
494 final int lookupMaxRetries) {
495 super(replyTo, lookupMaxRetries);
496 this.system = system;
497 this.replyTo = replyTo;
498 this.context = context;
499 this.clusterWrapper = clusterWrapper;
501 this.shardingService = shardingService;
502 this.toLookup = toLookup;
503 this.lookupMaxRetries = lookupMaxRetries;
509 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
511 ask.onComplete(new OnComplete<Object>() {
513 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
514 if (throwable != null) {
515 tryReschedule(throwable);
517 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
518 final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
519 if (leaderActor.isPresent()) {
520 // leader is found, backend seems ready, check if the frontend is ready
521 LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
522 clusterWrapper.getCurrentMemberName(), toLookup);
523 system.scheduler().scheduleOnce(
524 SHARD_LOOKUP_TASK_INTERVAL,
525 new FrontendLookupTask(
526 system, replyTo, shardingService, toLookup, lookupMaxRetries),
527 system.dispatcher());
533 }, system.dispatcher());
538 void reschedule(int retries) {
539 LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
540 clusterWrapper.getCurrentMemberName(), toLookup, retries);
541 system.scheduler().scheduleOnce(
542 SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
547 * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
548 * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
549 * case they race), the future for the cds shard creation is completed and the shard is ready for use.
551 private static final class FrontendLookupTask extends LookupTask {
553 private final ActorSystem system;
554 private final ActorRef replyTo;
555 private final DistributedShardedDOMDataTree shardingService;
556 private final DOMDataTreeIdentifier toLookup;
558 FrontendLookupTask(final ActorSystem system,
559 final ActorRef replyTo,
560 final DistributedShardedDOMDataTree shardingService,
561 final DOMDataTreeIdentifier toLookup,
562 final int lookupMaxRetries) {
563 super(replyTo, lookupMaxRetries);
564 this.system = system;
565 this.replyTo = replyTo;
566 this.shardingService = shardingService;
567 this.toLookup = toLookup;
572 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
573 shardingService.lookupShardFrontend(toLookup);
575 if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
576 replyTo.tell(new Success(null), noSender());
582 private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
583 final DOMDataTreeIdentifier prefix) {
588 if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
592 if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
600 void reschedule(int retries) {
601 LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
602 system.scheduler().scheduleOnce(
603 SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
608 * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
611 private static class ShardRemovalLookupTask extends LookupTask {
613 private final ActorSystem system;
614 private final ActorRef replyTo;
615 private final ActorContext context;
616 private final DOMDataTreeIdentifier toLookup;
618 ShardRemovalLookupTask(final ActorSystem system,
619 final ActorRef replyTo,
620 final ActorContext context,
621 final DOMDataTreeIdentifier toLookup,
622 final int lookupMaxRetries) {
623 super(replyTo, lookupMaxRetries);
624 this.system = system;
625 this.replyTo = replyTo;
626 this.context = context;
627 this.toLookup = toLookup;
632 final Future<ActorRef> localShardFuture =
633 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
635 localShardFuture.onComplete(new OnComplete<ActorRef>() {
637 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
638 if (throwable != null) {
639 //TODO Shouldn't we check why findLocalShard failed?
640 LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
642 replyTo.tell(new Success(null), noSender());
647 }, system.dispatcher());
651 void reschedule(int retries) {
652 LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
654 system.scheduler().scheduleOnce(
655 SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
660 * Task for handling the lookup of the backend for the configuration shard.
662 private static class ConfigShardLookupTask extends LookupTask {
664 private final ActorSystem system;
665 private final ActorRef replyTo;
666 private final ActorContext context;
667 private final ClusterWrapper clusterWrapper;
668 private final int lookupTaskMaxRetries;
670 ConfigShardLookupTask(final ActorSystem system,
671 final ActorRef replyTo,
672 final ActorContext context,
673 final ClusterWrapper clusterWrapper,
674 final StartConfigShardLookup message,
675 final int lookupMaxRetries) {
676 super(replyTo, lookupMaxRetries);
677 this.system = system;
678 this.replyTo = replyTo;
679 this.context = context;
680 this.clusterWrapper = clusterWrapper;
681 this.lookupTaskMaxRetries = lookupMaxRetries;
685 void reschedule(int retries) {
686 LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
687 system.scheduler().scheduleOnce(
688 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
693 final Optional<ActorRef> localShard =
694 context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
696 if (!localShard.isPresent()) {
699 LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup..");
700 system.scheduler().scheduleOnce(
701 SHARD_LOOKUP_TASK_INTERVAL,
702 new ConfigShardReadinessTask(
703 system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries),
704 system.dispatcher());
710 * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
712 private static class ConfigShardReadinessTask extends LookupTask {
714 private final ActorSystem system;
715 private final ActorRef replyTo;
716 private final ActorContext context;
717 private final ClusterWrapper clusterWrapper;
718 private final ActorRef shard;
720 ConfigShardReadinessTask(final ActorSystem system,
721 final ActorRef replyTo,
722 final ActorContext context,
723 final ClusterWrapper clusterWrapper,
724 final ActorRef shard,
725 final int lookupMaxRetries) {
726 super(replyTo, lookupMaxRetries);
727 this.system = system;
728 this.replyTo = replyTo;
729 this.context = context;
730 this.clusterWrapper = clusterWrapper;
735 void reschedule(int retries) {
736 LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
737 clusterWrapper.getCurrentMemberName(), retries);
738 system.scheduler().scheduleOnce(
739 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
744 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
746 ask.onComplete(new OnComplete<Object>() {
748 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
749 if (throwable != null) {
750 tryReschedule(throwable);
752 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
753 final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
754 if (leaderActor.isPresent()) {
755 // leader is found, backend seems ready, check if the frontend is ready
756 LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
757 clusterWrapper.getCurrentMemberName());
758 replyTo.tell(new Status.Success(null), noSender());
764 }, system.dispatcher());
768 public static class ShardedDataTreeActorCreator {
770 private DistributedShardedDOMDataTree shardingService;
771 private DistributedDataStore distributedConfigDatastore;
772 private DistributedDataStore distributedOperDatastore;
773 private ActorSystem actorSystem;
774 private ClusterWrapper cluster;
775 private int maxRetries;
777 public DistributedShardedDOMDataTree getShardingService() {
778 return shardingService;
781 public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
782 this.shardingService = shardingService;
786 public ActorSystem getActorSystem() {
790 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
791 this.actorSystem = actorSystem;
795 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
796 this.cluster = cluster;
800 public ClusterWrapper getClusterWrapper() {
804 public DistributedDataStore getDistributedConfigDatastore() {
805 return distributedConfigDatastore;
808 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
809 final DistributedDataStore distributedConfigDatastore) {
810 this.distributedConfigDatastore = distributedConfigDatastore;
814 public DistributedDataStore getDistributedOperDatastore() {
815 return distributedOperDatastore;
818 public ShardedDataTreeActorCreator setDistributedOperDatastore(
819 final DistributedDataStore distributedOperDatastore) {
820 this.distributedOperDatastore = distributedOperDatastore;
824 public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) {
825 this.maxRetries = maxRetries;
829 public int getLookupTaskMaxRetries() {
833 private void verify() {
834 Preconditions.checkNotNull(shardingService);
835 Preconditions.checkNotNull(actorSystem);
836 Preconditions.checkNotNull(cluster);
837 Preconditions.checkNotNull(distributedConfigDatastore);
838 Preconditions.checkNotNull(distributedOperDatastore);
841 public Props props() {
843 return Props.create(ShardedDataTreeActor.class, this);