2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.sharding;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Status;
18 import akka.actor.Status.Success;
19 import akka.cluster.ClusterEvent;
20 import akka.cluster.ClusterEvent.MemberExited;
21 import akka.cluster.ClusterEvent.MemberRemoved;
22 import akka.cluster.ClusterEvent.MemberUp;
23 import akka.cluster.ClusterEvent.MemberWeaklyUp;
24 import akka.cluster.ClusterEvent.ReachableMember;
25 import akka.cluster.ClusterEvent.UnreachableMember;
26 import akka.cluster.Member;
27 import akka.dispatch.OnComplete;
28 import akka.pattern.Patterns;
29 import akka.util.Timeout;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.Optional;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.TimeUnit;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
41 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
42 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
43 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
44 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
47 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
48 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
49 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
50 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
51 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
52 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
53 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
54 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
55 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
56 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
57 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
62 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
63 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
64 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.compat.java8.FutureConverters;
70 import scala.concurrent.Future;
71 import scala.concurrent.duration.FiniteDuration;
74 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
75 * nodes of newly open producers/shards on the local node.
77 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
79 private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
81 private static final String PERSISTENCE_ID = "sharding-service-actor";
82 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
84 static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
86 private final DistributedShardedDOMDataTree shardingService;
87 private final ActorSystem actorSystem;
88 private final ClusterWrapper clusterWrapper;
89 // helper actorContext used only for static calls to executeAsync etc
90 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
91 private final ActorUtils actorUtils;
92 private final ShardingServiceAddressResolver resolver;
93 private final DistributedDataStoreInterface distributedConfigDatastore;
94 private final DistributedDataStoreInterface distributedOperDatastore;
95 private final int lookupTaskMaxRetries;
97 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
99 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
100 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
102 shardingService = builder.getShardingService();
103 actorSystem = builder.getActorSystem();
104 clusterWrapper = builder.getClusterWrapper();
105 distributedConfigDatastore = builder.getDistributedConfigDatastore();
106 distributedOperDatastore = builder.getDistributedOperDatastore();
107 lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
108 actorUtils = distributedConfigDatastore.getActorUtils();
109 resolver = new ShardingServiceAddressResolver(
110 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
112 clusterWrapper.subscribeToMemberEvents(self());
116 public void preStart() {
120 protected void handleRecover(final Object message) {
121 LOG.debug("Received a recover message {}", message);
125 protected void handleCommand(final Object message) {
126 LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
127 if (message instanceof ClusterEvent.MemberUp) {
128 memberUp((ClusterEvent.MemberUp) message);
129 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
130 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
131 } else if (message instanceof ClusterEvent.MemberExited) {
132 memberExited((ClusterEvent.MemberExited) message);
133 } else if (message instanceof ClusterEvent.MemberRemoved) {
134 memberRemoved((ClusterEvent.MemberRemoved) message);
135 } else if (message instanceof ClusterEvent.UnreachableMember) {
136 memberUnreachable((ClusterEvent.UnreachableMember) message);
137 } else if (message instanceof ClusterEvent.ReachableMember) {
138 memberReachable((ClusterEvent.ReachableMember) message);
139 } else if (message instanceof ProducerCreated) {
140 onProducerCreated((ProducerCreated) message);
141 } else if (message instanceof NotifyProducerCreated) {
142 onNotifyProducerCreated((NotifyProducerCreated) message);
143 } else if (message instanceof ProducerRemoved) {
144 onProducerRemoved((ProducerRemoved) message);
145 } else if (message instanceof NotifyProducerRemoved) {
146 onNotifyProducerRemoved((NotifyProducerRemoved) message);
147 } else if (message instanceof PrefixShardCreated) {
148 onPrefixShardCreated((PrefixShardCreated) message);
149 } else if (message instanceof LookupPrefixShard) {
150 onLookupPrefixShard((LookupPrefixShard) message);
151 } else if (message instanceof PrefixShardRemovalLookup) {
152 onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
153 } else if (message instanceof PrefixShardRemoved) {
154 onPrefixShardRemoved((PrefixShardRemoved) message);
155 } else if (message instanceof StartConfigShardLookup) {
156 onStartConfigShardLookup((StartConfigShardLookup) message);
161 public String persistenceId() {
162 return PERSISTENCE_ID;
165 private void memberUp(final MemberUp message) {
166 final MemberName memberName = memberToName(message.member());
168 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
169 message.member().address());
171 resolver.addPeerAddress(memberName, message.member().address());
174 private void memberWeaklyUp(final MemberWeaklyUp message) {
175 final MemberName memberName = memberToName(message.member());
177 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
178 message.member().address());
180 resolver.addPeerAddress(memberName, message.member().address());
183 private void memberExited(final MemberExited message) {
184 final MemberName memberName = memberToName(message.member());
186 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
187 message.member().address());
189 resolver.removePeerAddress(memberName);
192 private void memberRemoved(final MemberRemoved message) {
193 final MemberName memberName = memberToName(message.member());
195 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
196 message.member().address());
198 resolver.removePeerAddress(memberName);
201 private void memberUnreachable(final UnreachableMember message) {
202 final MemberName memberName = memberToName(message.member());
203 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
205 resolver.removePeerAddress(memberName);
208 private void memberReachable(final ReachableMember message) {
209 final MemberName memberName = memberToName(message.member());
210 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
212 resolver.addPeerAddress(memberName, message.member().address());
215 private void onProducerCreated(final ProducerCreated message) {
216 LOG.debug("Received ProducerCreated: {}", message);
218 // fastpath if we have no peers
219 if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
220 getSender().tell(new Status.Success(null), ActorRef.noSender());
223 final ActorRef sender = getSender();
224 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
226 final List<CompletableFuture<Object>> futures = new ArrayList<>();
228 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
229 final ActorSelection actorSelection = actorSystem.actorSelection(address);
231 FutureConverters.toJava(
232 actorUtils.executeOperationAsync(
233 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
234 .toCompletableFuture());
237 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
238 futures.toArray(new CompletableFuture[futures.size()]));
241 .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender()))
242 .exceptionally(throwable -> {
243 sender.tell(new Status.Failure(throwable), self());
248 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
249 LOG.debug("Received NotifyProducerCreated: {}", message);
251 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
254 final ActorProducerRegistration registration =
255 new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
256 subtrees.forEach(id -> idToProducer.put(id, registration));
257 sender().tell(new Status.Success(null), self());
258 } catch (final IllegalArgumentException e) {
259 sender().tell(new Status.Failure(e), getSelf());
263 private void onProducerRemoved(final ProducerRemoved message) {
264 LOG.debug("Received ProducerRemoved: {}", message);
266 final List<CompletableFuture<Object>> futures = new ArrayList<>();
268 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
269 final ActorSelection selection = actorSystem.actorSelection(address);
271 futures.add(FutureConverters.toJava(
272 actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
273 .toCompletableFuture());
276 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
277 futures.toArray(new CompletableFuture[futures.size()]));
279 final ActorRef respondTo = getSender();
282 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
283 .exceptionally(e -> {
284 respondTo.tell(new Status.Failure(null), self());
290 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
291 LOG.debug("Received NotifyProducerRemoved: {}", message);
293 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
294 if (registration == null) {
295 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
296 getSender().tell(new Status.Success(null), ActorRef.noSender());
301 registration.close();
302 getSender().tell(new Status.Success(null), ActorRef.noSender());
303 } catch (final DOMDataTreeProducerException e) {
304 LOG.error("Unable to close producer", e);
305 getSender().tell(new Status.Failure(e), ActorRef.noSender());
309 @SuppressWarnings("checkstyle:IllegalCatch")
310 private void onLookupPrefixShard(final LookupPrefixShard message) {
311 LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
313 final DOMDataTreeIdentifier prefix = message.getPrefix();
315 final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
316 ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
318 // schedule a notification task for the reply
319 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
320 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
321 utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
324 private void onPrefixShardCreated(final PrefixShardCreated message) {
325 LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
327 final PrefixShardConfiguration config = message.getConfiguration();
329 shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
332 private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
333 LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
335 final ShardRemovalLookupTask removalTask =
336 new ShardRemovalLookupTask(actorSystem, getSender(),
337 actorUtils, message.getPrefix(), lookupTaskMaxRetries);
339 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
342 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
343 LOG.debug("Received PrefixShardRemoved: {}", message);
345 shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
348 private void onStartConfigShardLookup(final StartConfigShardLookup message) {
349 LOG.debug("Received StartConfigShardLookup: {}", message);
351 final ActorUtils context =
352 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
353 ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
355 // schedule a notification task for the reply
356 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
357 new ConfigShardLookupTask(
358 actorSystem, getSender(), context, message, lookupTaskMaxRetries),
359 actorSystem.dispatcher());
362 private static MemberName memberToName(final Member member) {
363 return MemberName.forName(member.roles().iterator().next());
366 private class ActorProducerRegistration {
368 private final DOMDataTreeProducer producer;
369 private final Collection<DOMDataTreeIdentifier> subtrees;
371 ActorProducerRegistration(final DOMDataTreeProducer producer,
372 final Collection<DOMDataTreeIdentifier> subtrees) {
373 this.producer = producer;
374 this.subtrees = subtrees;
377 void close() throws DOMDataTreeProducerException {
379 subtrees.forEach(idToProducer::remove);
383 private static class ShardFrontendRegistration extends
384 AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
386 private final ActorRef clientActor;
387 private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
389 ShardFrontendRegistration(final ActorRef clientActor,
390 final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
391 super(shardRegistration);
392 this.clientActor = clientActor;
393 this.shardRegistration = shardRegistration;
397 protected void removeRegistration() {
398 shardRegistration.close();
399 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
404 * Handles the lookup step of cds shard creation once the configuration is updated.
406 private static class ShardCreationLookupTask extends LookupTask {
408 private final ActorSystem system;
409 private final ActorRef replyTo;
410 private final ClusterWrapper clusterWrapper;
411 private final ActorUtils context;
412 private final DistributedShardedDOMDataTree shardingService;
413 private final DOMDataTreeIdentifier toLookup;
414 private final int lookupMaxRetries;
416 ShardCreationLookupTask(final ActorSystem system,
417 final ActorRef replyTo,
418 final ClusterWrapper clusterWrapper,
419 final ActorUtils context,
420 final DistributedShardedDOMDataTree shardingService,
421 final DOMDataTreeIdentifier toLookup,
422 final int lookupMaxRetries) {
423 super(replyTo, lookupMaxRetries);
424 this.system = system;
425 this.replyTo = replyTo;
426 this.clusterWrapper = clusterWrapper;
427 this.context = context;
428 this.shardingService = shardingService;
429 this.toLookup = toLookup;
430 this.lookupMaxRetries = lookupMaxRetries;
435 final Future<ActorRef> localShardFuture =
436 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
438 localShardFuture.onComplete(new OnComplete<ActorRef>() {
440 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
441 if (throwable != null) {
442 tryReschedule(throwable);
444 LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
446 system.scheduler().scheduleOnce(
447 SHARD_LOOKUP_TASK_INTERVAL,
448 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
449 shardingService, toLookup, lookupMaxRetries),
450 system.dispatcher());
453 }, system.dispatcher());
457 void reschedule(final int retries) {
458 LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
459 system.scheduler().scheduleOnce(
460 SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
465 * Handles the readiness step by waiting for a leader of the created shard.
467 private static class ShardLeaderLookupTask extends LookupTask {
469 private final ActorSystem system;
470 private final ActorRef replyTo;
471 private final ActorUtils context;
472 private final ClusterWrapper clusterWrapper;
473 private final ActorRef shard;
474 private final DistributedShardedDOMDataTree shardingService;
475 private final DOMDataTreeIdentifier toLookup;
476 private final int lookupMaxRetries;
478 ShardLeaderLookupTask(final ActorSystem system,
479 final ActorRef replyTo,
480 final ActorUtils context,
481 final ClusterWrapper clusterWrapper,
482 final ActorRef shard,
483 final DistributedShardedDOMDataTree shardingService,
484 final DOMDataTreeIdentifier toLookup,
485 final int lookupMaxRetries) {
486 super(replyTo, lookupMaxRetries);
487 this.system = system;
488 this.replyTo = replyTo;
489 this.context = context;
490 this.clusterWrapper = clusterWrapper;
492 this.shardingService = shardingService;
493 this.toLookup = toLookup;
494 this.lookupMaxRetries = lookupMaxRetries;
500 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
502 ask.onComplete(new OnComplete<>() {
504 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
505 if (throwable != null) {
506 tryReschedule(throwable);
508 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
509 final Optional<String> leaderActor = findLeader.getLeaderActor();
510 if (leaderActor.isPresent()) {
511 // leader is found, backend seems ready, check if the frontend is ready
512 LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
513 clusterWrapper.getCurrentMemberName(), toLookup);
514 system.scheduler().scheduleOnce(
515 SHARD_LOOKUP_TASK_INTERVAL,
516 new FrontendLookupTask(
517 system, replyTo, shardingService, toLookup, lookupMaxRetries),
518 system.dispatcher());
524 }, system.dispatcher());
529 void reschedule(final int retries) {
530 LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
531 clusterWrapper.getCurrentMemberName(), toLookup, retries);
532 system.scheduler().scheduleOnce(
533 SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
538 * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
539 * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
540 * case they race), the future for the cds shard creation is completed and the shard is ready for use.
542 private static final class FrontendLookupTask extends LookupTask {
544 private final ActorSystem system;
545 private final ActorRef replyTo;
546 private final DistributedShardedDOMDataTree shardingService;
547 private final DOMDataTreeIdentifier toLookup;
549 FrontendLookupTask(final ActorSystem system,
550 final ActorRef replyTo,
551 final DistributedShardedDOMDataTree shardingService,
552 final DOMDataTreeIdentifier toLookup,
553 final int lookupMaxRetries) {
554 super(replyTo, lookupMaxRetries);
555 this.system = system;
556 this.replyTo = replyTo;
557 this.shardingService = shardingService;
558 this.toLookup = toLookup;
563 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
564 shardingService.lookupShardFrontend(toLookup);
566 if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
567 replyTo.tell(new Success(null), ActorRef.noSender());
573 private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
574 final DOMDataTreeIdentifier prefix) {
579 if (YangInstanceIdentifier.empty().equals(prefix.getRootIdentifier())) {
583 if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
591 void reschedule(final int retries) {
592 LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
593 system.scheduler().scheduleOnce(
594 SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
599 * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
602 private static class ShardRemovalLookupTask extends LookupTask {
604 private final ActorSystem system;
605 private final ActorRef replyTo;
606 private final ActorUtils context;
607 private final DOMDataTreeIdentifier toLookup;
609 ShardRemovalLookupTask(final ActorSystem system,
610 final ActorRef replyTo,
611 final ActorUtils context,
612 final DOMDataTreeIdentifier toLookup,
613 final int lookupMaxRetries) {
614 super(replyTo, lookupMaxRetries);
615 this.system = system;
616 this.replyTo = replyTo;
617 this.context = context;
618 this.toLookup = toLookup;
623 final Future<ActorRef> localShardFuture =
624 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
626 localShardFuture.onComplete(new OnComplete<ActorRef>() {
628 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
629 if (throwable != null) {
630 //TODO Shouldn't we check why findLocalShard failed?
631 LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
633 replyTo.tell(new Success(null), ActorRef.noSender());
638 }, system.dispatcher());
642 void reschedule(final int retries) {
643 LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
645 system.scheduler().scheduleOnce(
646 SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
651 * Task for handling the lookup of the backend for the configuration shard.
653 private static class ConfigShardLookupTask extends LookupTask {
655 private final ActorSystem system;
656 private final ActorRef replyTo;
657 private final ActorUtils context;
659 ConfigShardLookupTask(final ActorSystem system,
660 final ActorRef replyTo,
661 final ActorUtils context,
662 final StartConfigShardLookup message,
663 final int lookupMaxRetries) {
664 super(replyTo, lookupMaxRetries);
665 this.system = system;
666 this.replyTo = replyTo;
667 this.context = context;
671 void reschedule(final int retries) {
672 LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
673 system.scheduler().scheduleOnce(
674 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
679 final Optional<ActorRef> localShard =
680 context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
682 if (!localShard.isPresent()) {
685 LOG.debug("Local backend for prefix configuration shard lookup successful");
686 replyTo.tell(new Status.Success(null), ActorRef.noSender());
692 * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
694 private static class ConfigShardReadinessTask extends LookupTask {
696 private final ActorSystem system;
697 private final ActorRef replyTo;
698 private final ActorUtils context;
699 private final ClusterWrapper clusterWrapper;
700 private final ActorRef shard;
702 ConfigShardReadinessTask(final ActorSystem system,
703 final ActorRef replyTo,
704 final ActorUtils context,
705 final ClusterWrapper clusterWrapper,
706 final ActorRef shard,
707 final int lookupMaxRetries) {
708 super(replyTo, lookupMaxRetries);
709 this.system = system;
710 this.replyTo = replyTo;
711 this.context = context;
712 this.clusterWrapper = clusterWrapper;
717 void reschedule(final int retries) {
718 LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
719 clusterWrapper.getCurrentMemberName(), retries);
720 system.scheduler().scheduleOnce(
721 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
726 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
728 ask.onComplete(new OnComplete<>() {
730 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
731 if (throwable != null) {
732 tryReschedule(throwable);
734 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
735 final Optional<String> leaderActor = findLeader.getLeaderActor();
736 if (leaderActor.isPresent()) {
737 // leader is found, backend seems ready, check if the frontend is ready
738 LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
739 clusterWrapper.getCurrentMemberName());
740 replyTo.tell(new Status.Success(null), ActorRef.noSender());
746 }, system.dispatcher());
750 public static class ShardedDataTreeActorCreator {
752 private DistributedShardedDOMDataTree shardingService;
753 private DistributedDataStoreInterface distributedConfigDatastore;
754 private DistributedDataStoreInterface distributedOperDatastore;
755 private ActorSystem actorSystem;
756 private ClusterWrapper cluster;
757 private int maxRetries;
759 public DistributedShardedDOMDataTree getShardingService() {
760 return shardingService;
763 public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
764 this.shardingService = shardingService;
768 public ActorSystem getActorSystem() {
772 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
773 this.actorSystem = actorSystem;
777 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
778 this.cluster = clusterWrapper;
782 public ClusterWrapper getClusterWrapper() {
786 public DistributedDataStoreInterface getDistributedConfigDatastore() {
787 return distributedConfigDatastore;
790 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
791 final DistributedDataStoreInterface distributedConfigDatastore) {
792 this.distributedConfigDatastore = distributedConfigDatastore;
796 public DistributedDataStoreInterface getDistributedOperDatastore() {
797 return distributedOperDatastore;
800 public ShardedDataTreeActorCreator setDistributedOperDatastore(
801 final DistributedDataStoreInterface distributedOperDatastore) {
802 this.distributedOperDatastore = distributedOperDatastore;
806 public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
807 this.maxRetries = newMaxRetries;
811 public int getLookupTaskMaxRetries() {
815 private void verify() {
816 requireNonNull(shardingService);
817 requireNonNull(actorSystem);
818 requireNonNull(cluster);
819 requireNonNull(distributedConfigDatastore);
820 requireNonNull(distributedOperDatastore);
823 public Props props() {
825 return Props.create(ShardedDataTreeActor.class, this);