2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.sharding;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Status;
18 import akka.actor.Status.Success;
19 import akka.cluster.ClusterEvent;
20 import akka.cluster.ClusterEvent.MemberExited;
21 import akka.cluster.ClusterEvent.MemberRemoved;
22 import akka.cluster.ClusterEvent.MemberUp;
23 import akka.cluster.ClusterEvent.MemberWeaklyUp;
24 import akka.cluster.ClusterEvent.ReachableMember;
25 import akka.cluster.ClusterEvent.UnreachableMember;
26 import akka.cluster.Member;
27 import akka.dispatch.OnComplete;
28 import akka.pattern.Patterns;
29 import akka.util.Timeout;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.Optional;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.TimeUnit;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
41 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
42 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
43 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
44 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
47 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
48 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
49 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
50 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
51 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
52 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
53 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
54 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
55 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
56 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
57 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
62 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
63 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
64 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.compat.java8.FutureConverters;
70 import scala.concurrent.Future;
71 import scala.concurrent.duration.FiniteDuration;
74 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
75 * nodes of newly open producers/shards on the local node.
77 @Deprecated(forRemoval = true)
78 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
80 private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
82 private static final String PERSISTENCE_ID = "sharding-service-actor";
83 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
85 static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
87 private final DistributedShardedDOMDataTree shardingService;
88 private final ActorSystem actorSystem;
89 private final ClusterWrapper clusterWrapper;
90 // helper actorContext used only for static calls to executeAsync etc
91 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
92 private final ActorUtils actorUtils;
93 private final ShardingServiceAddressResolver resolver;
94 private final DistributedDataStoreInterface distributedConfigDatastore;
95 private final DistributedDataStoreInterface distributedOperDatastore;
96 private final int lookupTaskMaxRetries;
98 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
100 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
101 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
103 shardingService = builder.getShardingService();
104 actorSystem = builder.getActorSystem();
105 clusterWrapper = builder.getClusterWrapper();
106 distributedConfigDatastore = builder.getDistributedConfigDatastore();
107 distributedOperDatastore = builder.getDistributedOperDatastore();
108 lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
109 actorUtils = distributedConfigDatastore.getActorUtils();
110 resolver = new ShardingServiceAddressResolver(
111 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
113 clusterWrapper.subscribeToMemberEvents(self());
117 public void preStart() {
121 protected void handleRecover(final Object message) {
122 LOG.debug("Received a recover message {}", message);
126 protected void handleCommand(final Object message) {
127 LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
128 if (message instanceof ClusterEvent.MemberUp) {
129 memberUp((ClusterEvent.MemberUp) message);
130 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
131 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
132 } else if (message instanceof ClusterEvent.MemberExited) {
133 memberExited((ClusterEvent.MemberExited) message);
134 } else if (message instanceof ClusterEvent.MemberRemoved) {
135 memberRemoved((ClusterEvent.MemberRemoved) message);
136 } else if (message instanceof ClusterEvent.UnreachableMember) {
137 memberUnreachable((ClusterEvent.UnreachableMember) message);
138 } else if (message instanceof ClusterEvent.ReachableMember) {
139 memberReachable((ClusterEvent.ReachableMember) message);
140 } else if (message instanceof ProducerCreated) {
141 onProducerCreated((ProducerCreated) message);
142 } else if (message instanceof NotifyProducerCreated) {
143 onNotifyProducerCreated((NotifyProducerCreated) message);
144 } else if (message instanceof ProducerRemoved) {
145 onProducerRemoved((ProducerRemoved) message);
146 } else if (message instanceof NotifyProducerRemoved) {
147 onNotifyProducerRemoved((NotifyProducerRemoved) message);
148 } else if (message instanceof PrefixShardCreated) {
149 onPrefixShardCreated((PrefixShardCreated) message);
150 } else if (message instanceof LookupPrefixShard) {
151 onLookupPrefixShard((LookupPrefixShard) message);
152 } else if (message instanceof PrefixShardRemovalLookup) {
153 onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
154 } else if (message instanceof PrefixShardRemoved) {
155 onPrefixShardRemoved((PrefixShardRemoved) message);
156 } else if (message instanceof StartConfigShardLookup) {
157 onStartConfigShardLookup((StartConfigShardLookup) message);
162 public String persistenceId() {
163 return PERSISTENCE_ID;
166 private void memberUp(final MemberUp message) {
167 final MemberName memberName = memberToName(message.member());
169 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
170 message.member().address());
172 resolver.addPeerAddress(memberName, message.member().address());
175 private void memberWeaklyUp(final MemberWeaklyUp message) {
176 final MemberName memberName = memberToName(message.member());
178 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
179 message.member().address());
181 resolver.addPeerAddress(memberName, message.member().address());
184 private void memberExited(final MemberExited message) {
185 final MemberName memberName = memberToName(message.member());
187 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
188 message.member().address());
190 resolver.removePeerAddress(memberName);
193 private void memberRemoved(final MemberRemoved message) {
194 final MemberName memberName = memberToName(message.member());
196 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
197 message.member().address());
199 resolver.removePeerAddress(memberName);
202 private void memberUnreachable(final UnreachableMember message) {
203 final MemberName memberName = memberToName(message.member());
204 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
206 resolver.removePeerAddress(memberName);
209 private void memberReachable(final ReachableMember message) {
210 final MemberName memberName = memberToName(message.member());
211 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
213 resolver.addPeerAddress(memberName, message.member().address());
216 private void onProducerCreated(final ProducerCreated message) {
217 LOG.debug("Received ProducerCreated: {}", message);
219 // fastpath if we have no peers
220 if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
221 getSender().tell(new Status.Success(null), ActorRef.noSender());
224 final ActorRef sender = getSender();
225 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
227 final List<CompletableFuture<Object>> futures = new ArrayList<>();
229 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
230 final ActorSelection actorSelection = actorSystem.actorSelection(address);
232 FutureConverters.toJava(
233 actorUtils.executeOperationAsync(
234 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
235 .toCompletableFuture());
238 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
239 futures.toArray(new CompletableFuture[futures.size()]));
242 .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender()))
243 .exceptionally(throwable -> {
244 sender.tell(new Status.Failure(throwable), self());
249 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
250 LOG.debug("Received NotifyProducerCreated: {}", message);
252 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
255 final ActorProducerRegistration registration =
256 new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
257 subtrees.forEach(id -> idToProducer.put(id, registration));
258 sender().tell(new Status.Success(null), self());
259 } catch (final IllegalArgumentException e) {
260 sender().tell(new Status.Failure(e), getSelf());
264 private void onProducerRemoved(final ProducerRemoved message) {
265 LOG.debug("Received ProducerRemoved: {}", message);
267 final List<CompletableFuture<Object>> futures = new ArrayList<>();
269 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
270 final ActorSelection selection = actorSystem.actorSelection(address);
272 futures.add(FutureConverters.toJava(
273 actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
274 .toCompletableFuture());
277 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
278 futures.toArray(new CompletableFuture[futures.size()]));
280 final ActorRef respondTo = getSender();
283 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
284 .exceptionally(e -> {
285 respondTo.tell(new Status.Failure(null), self());
291 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
292 LOG.debug("Received NotifyProducerRemoved: {}", message);
294 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
295 if (registration == null) {
296 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
297 getSender().tell(new Status.Success(null), ActorRef.noSender());
302 registration.close();
303 getSender().tell(new Status.Success(null), ActorRef.noSender());
304 } catch (final DOMDataTreeProducerException e) {
305 LOG.error("Unable to close producer", e);
306 getSender().tell(new Status.Failure(e), ActorRef.noSender());
310 @SuppressWarnings("checkstyle:IllegalCatch")
311 private void onLookupPrefixShard(final LookupPrefixShard message) {
312 LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
314 final DOMDataTreeIdentifier prefix = message.getPrefix();
316 final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
317 ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
319 // schedule a notification task for the reply
320 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
321 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
322 utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
325 private void onPrefixShardCreated(final PrefixShardCreated message) {
326 LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
328 final PrefixShardConfiguration config = message.getConfiguration();
330 shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
333 private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
334 LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
336 final ShardRemovalLookupTask removalTask =
337 new ShardRemovalLookupTask(actorSystem, getSender(),
338 actorUtils, message.getPrefix(), lookupTaskMaxRetries);
340 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
343 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
344 LOG.debug("Received PrefixShardRemoved: {}", message);
346 shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
349 private void onStartConfigShardLookup(final StartConfigShardLookup message) {
350 LOG.debug("Received StartConfigShardLookup: {}", message);
352 final ActorUtils context =
353 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
354 ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
356 // schedule a notification task for the reply
357 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
358 new ConfigShardLookupTask(
359 actorSystem, getSender(), context, message, lookupTaskMaxRetries),
360 actorSystem.dispatcher());
363 private static MemberName memberToName(final Member member) {
364 return MemberName.forName(member.roles().iterator().next());
367 private class ActorProducerRegistration {
369 private final DOMDataTreeProducer producer;
370 private final Collection<DOMDataTreeIdentifier> subtrees;
372 ActorProducerRegistration(final DOMDataTreeProducer producer,
373 final Collection<DOMDataTreeIdentifier> subtrees) {
374 this.producer = producer;
375 this.subtrees = subtrees;
378 void close() throws DOMDataTreeProducerException {
380 subtrees.forEach(idToProducer::remove);
384 private static class ShardFrontendRegistration extends
385 AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
387 private final ActorRef clientActor;
388 private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
390 ShardFrontendRegistration(final ActorRef clientActor,
391 final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
392 super(shardRegistration);
393 this.clientActor = clientActor;
394 this.shardRegistration = shardRegistration;
398 protected void removeRegistration() {
399 shardRegistration.close();
400 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
405 * Handles the lookup step of cds shard creation once the configuration is updated.
407 private static class ShardCreationLookupTask extends LookupTask {
409 private final ActorSystem system;
410 private final ActorRef replyTo;
411 private final ClusterWrapper clusterWrapper;
412 private final ActorUtils context;
413 private final DistributedShardedDOMDataTree shardingService;
414 private final DOMDataTreeIdentifier toLookup;
415 private final int lookupMaxRetries;
417 ShardCreationLookupTask(final ActorSystem system,
418 final ActorRef replyTo,
419 final ClusterWrapper clusterWrapper,
420 final ActorUtils context,
421 final DistributedShardedDOMDataTree shardingService,
422 final DOMDataTreeIdentifier toLookup,
423 final int lookupMaxRetries) {
424 super(replyTo, lookupMaxRetries);
425 this.system = system;
426 this.replyTo = replyTo;
427 this.clusterWrapper = clusterWrapper;
428 this.context = context;
429 this.shardingService = shardingService;
430 this.toLookup = toLookup;
431 this.lookupMaxRetries = lookupMaxRetries;
436 final Future<ActorRef> localShardFuture =
437 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
439 localShardFuture.onComplete(new OnComplete<ActorRef>() {
441 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
442 if (throwable != null) {
443 tryReschedule(throwable);
445 LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
447 system.scheduler().scheduleOnce(
448 SHARD_LOOKUP_TASK_INTERVAL,
449 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
450 shardingService, toLookup, lookupMaxRetries),
451 system.dispatcher());
454 }, system.dispatcher());
458 void reschedule(final int retries) {
459 LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
460 system.scheduler().scheduleOnce(
461 SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
466 * Handles the readiness step by waiting for a leader of the created shard.
468 private static class ShardLeaderLookupTask extends LookupTask {
470 private final ActorSystem system;
471 private final ActorRef replyTo;
472 private final ActorUtils context;
473 private final ClusterWrapper clusterWrapper;
474 private final ActorRef shard;
475 private final DistributedShardedDOMDataTree shardingService;
476 private final DOMDataTreeIdentifier toLookup;
477 private final int lookupMaxRetries;
479 ShardLeaderLookupTask(final ActorSystem system,
480 final ActorRef replyTo,
481 final ActorUtils context,
482 final ClusterWrapper clusterWrapper,
483 final ActorRef shard,
484 final DistributedShardedDOMDataTree shardingService,
485 final DOMDataTreeIdentifier toLookup,
486 final int lookupMaxRetries) {
487 super(replyTo, lookupMaxRetries);
488 this.system = system;
489 this.replyTo = replyTo;
490 this.context = context;
491 this.clusterWrapper = clusterWrapper;
493 this.shardingService = shardingService;
494 this.toLookup = toLookup;
495 this.lookupMaxRetries = lookupMaxRetries;
501 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
503 ask.onComplete(new OnComplete<>() {
505 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
506 if (throwable != null) {
507 tryReschedule(throwable);
509 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
510 final Optional<String> leaderActor = findLeader.getLeaderActor();
511 if (leaderActor.isPresent()) {
512 // leader is found, backend seems ready, check if the frontend is ready
513 LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
514 clusterWrapper.getCurrentMemberName(), toLookup);
515 system.scheduler().scheduleOnce(
516 SHARD_LOOKUP_TASK_INTERVAL,
517 new FrontendLookupTask(
518 system, replyTo, shardingService, toLookup, lookupMaxRetries),
519 system.dispatcher());
525 }, system.dispatcher());
530 void reschedule(final int retries) {
531 LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
532 clusterWrapper.getCurrentMemberName(), toLookup, retries);
533 system.scheduler().scheduleOnce(
534 SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
539 * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
540 * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
541 * case they race), the future for the cds shard creation is completed and the shard is ready for use.
543 private static final class FrontendLookupTask extends LookupTask {
545 private final ActorSystem system;
546 private final ActorRef replyTo;
547 private final DistributedShardedDOMDataTree shardingService;
548 private final DOMDataTreeIdentifier toLookup;
550 FrontendLookupTask(final ActorSystem system,
551 final ActorRef replyTo,
552 final DistributedShardedDOMDataTree shardingService,
553 final DOMDataTreeIdentifier toLookup,
554 final int lookupMaxRetries) {
555 super(replyTo, lookupMaxRetries);
556 this.system = system;
557 this.replyTo = replyTo;
558 this.shardingService = shardingService;
559 this.toLookup = toLookup;
564 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
565 shardingService.lookupShardFrontend(toLookup);
567 if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
568 replyTo.tell(new Success(null), ActorRef.noSender());
574 private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
575 final DOMDataTreeIdentifier prefix) {
580 if (YangInstanceIdentifier.empty().equals(prefix.getRootIdentifier())) {
584 if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
592 void reschedule(final int retries) {
593 LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
594 system.scheduler().scheduleOnce(
595 SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
600 * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
603 private static class ShardRemovalLookupTask extends LookupTask {
605 private final ActorSystem system;
606 private final ActorRef replyTo;
607 private final ActorUtils context;
608 private final DOMDataTreeIdentifier toLookup;
610 ShardRemovalLookupTask(final ActorSystem system,
611 final ActorRef replyTo,
612 final ActorUtils context,
613 final DOMDataTreeIdentifier toLookup,
614 final int lookupMaxRetries) {
615 super(replyTo, lookupMaxRetries);
616 this.system = system;
617 this.replyTo = replyTo;
618 this.context = context;
619 this.toLookup = toLookup;
624 final Future<ActorRef> localShardFuture =
625 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
627 localShardFuture.onComplete(new OnComplete<ActorRef>() {
629 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
630 if (throwable != null) {
631 //TODO Shouldn't we check why findLocalShard failed?
632 LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
634 replyTo.tell(new Success(null), ActorRef.noSender());
639 }, system.dispatcher());
643 void reschedule(final int retries) {
644 LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
646 system.scheduler().scheduleOnce(
647 SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
652 * Task for handling the lookup of the backend for the configuration shard.
654 private static class ConfigShardLookupTask extends LookupTask {
656 private final ActorSystem system;
657 private final ActorRef replyTo;
658 private final ActorUtils context;
660 ConfigShardLookupTask(final ActorSystem system,
661 final ActorRef replyTo,
662 final ActorUtils context,
663 final StartConfigShardLookup message,
664 final int lookupMaxRetries) {
665 super(replyTo, lookupMaxRetries);
666 this.system = system;
667 this.replyTo = replyTo;
668 this.context = context;
672 void reschedule(final int retries) {
673 LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
674 system.scheduler().scheduleOnce(
675 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
680 final Optional<ActorRef> localShard =
681 context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
683 if (!localShard.isPresent()) {
686 LOG.debug("Local backend for prefix configuration shard lookup successful");
687 replyTo.tell(new Status.Success(null), ActorRef.noSender());
693 * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
695 private static class ConfigShardReadinessTask extends LookupTask {
697 private final ActorSystem system;
698 private final ActorRef replyTo;
699 private final ActorUtils context;
700 private final ClusterWrapper clusterWrapper;
701 private final ActorRef shard;
703 ConfigShardReadinessTask(final ActorSystem system,
704 final ActorRef replyTo,
705 final ActorUtils context,
706 final ClusterWrapper clusterWrapper,
707 final ActorRef shard,
708 final int lookupMaxRetries) {
709 super(replyTo, lookupMaxRetries);
710 this.system = system;
711 this.replyTo = replyTo;
712 this.context = context;
713 this.clusterWrapper = clusterWrapper;
718 void reschedule(final int retries) {
719 LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
720 clusterWrapper.getCurrentMemberName(), retries);
721 system.scheduler().scheduleOnce(
722 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
727 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
729 ask.onComplete(new OnComplete<>() {
731 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
732 if (throwable != null) {
733 tryReschedule(throwable);
735 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
736 final Optional<String> leaderActor = findLeader.getLeaderActor();
737 if (leaderActor.isPresent()) {
738 // leader is found, backend seems ready, check if the frontend is ready
739 LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
740 clusterWrapper.getCurrentMemberName());
741 replyTo.tell(new Status.Success(null), ActorRef.noSender());
747 }, system.dispatcher());
751 public static class ShardedDataTreeActorCreator {
753 private DistributedShardedDOMDataTree shardingService;
754 private DistributedDataStoreInterface distributedConfigDatastore;
755 private DistributedDataStoreInterface distributedOperDatastore;
756 private ActorSystem actorSystem;
757 private ClusterWrapper cluster;
758 private int maxRetries;
760 public DistributedShardedDOMDataTree getShardingService() {
761 return shardingService;
764 public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
765 this.shardingService = shardingService;
769 public ActorSystem getActorSystem() {
773 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
774 this.actorSystem = actorSystem;
778 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
779 this.cluster = clusterWrapper;
783 public ClusterWrapper getClusterWrapper() {
787 public DistributedDataStoreInterface getDistributedConfigDatastore() {
788 return distributedConfigDatastore;
791 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
792 final DistributedDataStoreInterface distributedConfigDatastore) {
793 this.distributedConfigDatastore = distributedConfigDatastore;
797 public DistributedDataStoreInterface getDistributedOperDatastore() {
798 return distributedOperDatastore;
801 public ShardedDataTreeActorCreator setDistributedOperDatastore(
802 final DistributedDataStoreInterface distributedOperDatastore) {
803 this.distributedOperDatastore = distributedOperDatastore;
807 public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
808 this.maxRetries = newMaxRetries;
812 public int getLookupTaskMaxRetries() {
816 private void verify() {
817 requireNonNull(shardingService);
818 requireNonNull(actorSystem);
819 requireNonNull(cluster);
820 requireNonNull(distributedConfigDatastore);
821 requireNonNull(distributedOperDatastore);
824 public Props props() {
826 return Props.create(ShardedDataTreeActor.class, this);