2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.Status.Success;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberExited;
23 import akka.cluster.ClusterEvent.MemberRemoved;
24 import akka.cluster.ClusterEvent.MemberUp;
25 import akka.cluster.ClusterEvent.MemberWeaklyUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import akka.pattern.Patterns;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Preconditions;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.TimeUnit;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
44 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
45 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
46 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
47 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
48 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
49 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
50 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
51 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
52 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
53 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
54 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
55 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
56 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
57 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
58 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
59 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
60 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
65 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
66 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
67 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
68 import org.opendaylight.yangtools.concepts.ListenerRegistration;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72 import scala.compat.java8.FutureConverters;
73 import scala.concurrent.Future;
74 import scala.concurrent.duration.FiniteDuration;
77 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
78 * nodes of newly open producers/shards on the local node.
80 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
82 private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
84 private static final String PERSISTENCE_ID = "sharding-service-actor";
85 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
87 static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
89 private final DistributedShardedDOMDataTree shardingService;
90 private final ActorSystem actorSystem;
91 private final ClusterWrapper clusterWrapper;
92 // helper actorContext used only for static calls to executeAsync etc
93 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
94 private final ActorContext actorContext;
95 private final ShardingServiceAddressResolver resolver;
96 private final AbstractDataStore distributedConfigDatastore;
97 private final AbstractDataStore distributedOperDatastore;
98 private final int lookupTaskMaxRetries;
100 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
101 private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
103 private final Cluster cluster;
105 private final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
107 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
108 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
110 shardingService = builder.getShardingService();
111 actorSystem = builder.getActorSystem();
112 clusterWrapper = builder.getClusterWrapper();
113 distributedConfigDatastore = builder.getDistributedConfigDatastore();
114 distributedOperDatastore = builder.getDistributedOperDatastore();
115 lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
116 actorContext = distributedConfigDatastore.getActorContext();
117 resolver = new ShardingServiceAddressResolver(
118 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
120 clusterWrapper.subscribeToMemberEvents(self());
121 cluster = Cluster.get(actorSystem);
125 public void preStart() {
129 protected void handleRecover(final Object message) throws Exception {
130 LOG.debug("Received a recover message {}", message);
134 protected void handleCommand(final Object message) throws Exception {
135 LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
136 if (message instanceof ClusterEvent.MemberUp) {
137 memberUp((ClusterEvent.MemberUp) message);
138 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
139 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
140 } else if (message instanceof ClusterEvent.MemberExited) {
141 memberExited((ClusterEvent.MemberExited) message);
142 } else if (message instanceof ClusterEvent.MemberRemoved) {
143 memberRemoved((ClusterEvent.MemberRemoved) message);
144 } else if (message instanceof ClusterEvent.UnreachableMember) {
145 memberUnreachable((ClusterEvent.UnreachableMember) message);
146 } else if (message instanceof ClusterEvent.ReachableMember) {
147 memberReachable((ClusterEvent.ReachableMember) message);
148 } else if (message instanceof ProducerCreated) {
149 onProducerCreated((ProducerCreated) message);
150 } else if (message instanceof NotifyProducerCreated) {
151 onNotifyProducerCreated((NotifyProducerCreated) message);
152 } else if (message instanceof ProducerRemoved) {
153 onProducerRemoved((ProducerRemoved) message);
154 } else if (message instanceof NotifyProducerRemoved) {
155 onNotifyProducerRemoved((NotifyProducerRemoved) message);
156 } else if (message instanceof PrefixShardCreated) {
157 onPrefixShardCreated((PrefixShardCreated) message);
158 } else if (message instanceof LookupPrefixShard) {
159 onLookupPrefixShard((LookupPrefixShard) message);
160 } else if (message instanceof PrefixShardRemovalLookup) {
161 onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
162 } else if (message instanceof PrefixShardRemoved) {
163 onPrefixShardRemoved((PrefixShardRemoved) message);
164 } else if (message instanceof StartConfigShardLookup) {
165 onStartConfigShardLookup((StartConfigShardLookup) message);
170 public String persistenceId() {
171 return PERSISTENCE_ID;
174 private void memberUp(final MemberUp message) {
175 final MemberName memberName = memberToName(message.member());
177 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
178 message.member().address());
180 resolver.addPeerAddress(memberName, message.member().address());
183 private void memberWeaklyUp(final MemberWeaklyUp message) {
184 final MemberName memberName = memberToName(message.member());
186 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
187 message.member().address());
189 resolver.addPeerAddress(memberName, message.member().address());
192 private void memberExited(final MemberExited message) {
193 final MemberName memberName = memberToName(message.member());
195 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
196 message.member().address());
198 resolver.removePeerAddress(memberName);
201 private void memberRemoved(final MemberRemoved message) {
202 final MemberName memberName = memberToName(message.member());
204 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
205 message.member().address());
207 resolver.removePeerAddress(memberName);
210 private void memberUnreachable(final UnreachableMember message) {
211 final MemberName memberName = memberToName(message.member());
212 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
214 resolver.removePeerAddress(memberName);
217 private void memberReachable(final ReachableMember message) {
218 final MemberName memberName = memberToName(message.member());
219 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
221 resolver.addPeerAddress(memberName, message.member().address());
224 private void onProducerCreated(final ProducerCreated message) {
225 LOG.debug("Received ProducerCreated: {}", message);
227 // fastpath if we have no peers
228 if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
229 getSender().tell(new Status.Success(null), noSender());
232 final ActorRef sender = getSender();
233 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
235 final List<CompletableFuture<Object>> futures = new ArrayList<>();
237 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
238 final ActorSelection actorSelection = actorSystem.actorSelection(address);
240 FutureConverters.toJava(
241 actorContext.executeOperationAsync(
242 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
243 .toCompletableFuture());
246 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
247 futures.toArray(new CompletableFuture[futures.size()]));
249 combinedFuture.thenRun(() -> {
250 sender.tell(new Status.Success(null), noSender());
251 }).exceptionally(throwable -> {
252 sender.tell(new Status.Failure(throwable), self());
257 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
258 LOG.debug("Received NotifyProducerCreated: {}", message);
260 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
263 final ActorProducerRegistration registration =
264 new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
265 subtrees.forEach(id -> idToProducer.put(id, registration));
266 sender().tell(new Status.Success(null), self());
267 } catch (final IllegalArgumentException e) {
268 sender().tell(new Status.Failure(e), getSelf());
272 private void onProducerRemoved(final ProducerRemoved message) {
273 LOG.debug("Received ProducerRemoved: {}", message);
275 final List<CompletableFuture<Object>> futures = new ArrayList<>();
277 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
278 final ActorSelection selection = actorSystem.actorSelection(address);
280 futures.add(FutureConverters.toJava(
281 actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
282 .toCompletableFuture());
285 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
286 futures.toArray(new CompletableFuture[futures.size()]));
288 final ActorRef respondTo = getSender();
291 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
292 .exceptionally(e -> {
293 respondTo.tell(new Status.Failure(null), self());
299 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
300 LOG.debug("Received NotifyProducerRemoved: {}", message);
302 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
303 if (registration == null) {
304 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
305 getSender().tell(new Status.Success(null), noSender());
310 registration.close();
311 getSender().tell(new Status.Success(null), noSender());
312 } catch (final DOMDataTreeProducerException e) {
313 LOG.error("Unable to close producer", e);
314 getSender().tell(new Status.Failure(e), noSender());
318 @SuppressWarnings("checkstyle:IllegalCatch")
319 private void onLookupPrefixShard(final LookupPrefixShard message) {
320 LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
322 final DOMDataTreeIdentifier prefix = message.getPrefix();
324 final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
325 ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
327 // schedule a notification task for the reply
328 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
329 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
330 context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
333 private void onPrefixShardCreated(final PrefixShardCreated message) {
334 LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
336 final PrefixShardConfiguration config = message.getConfiguration();
338 shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
341 private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
342 LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
344 final ShardRemovalLookupTask removalTask =
345 new ShardRemovalLookupTask(actorSystem, getSender(),
346 actorContext, message.getPrefix(), lookupTaskMaxRetries);
348 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
351 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
352 LOG.debug("Received PrefixShardRemoved: {}", message);
354 shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
357 private void onStartConfigShardLookup(final StartConfigShardLookup message) {
358 LOG.debug("Received StartConfigShardLookup: {}", message);
360 final ActorContext context =
361 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
362 ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
364 // schedule a notification task for the reply
365 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
366 new ConfigShardLookupTask(
367 actorSystem, getSender(), context, message, lookupTaskMaxRetries),
368 actorSystem.dispatcher());
371 private static MemberName memberToName(final Member member) {
372 return MemberName.forName(member.roles().iterator().next());
375 private class ActorProducerRegistration {
377 private final DOMDataTreeProducer producer;
378 private final Collection<DOMDataTreeIdentifier> subtrees;
380 ActorProducerRegistration(final DOMDataTreeProducer producer,
381 final Collection<DOMDataTreeIdentifier> subtrees) {
382 this.producer = producer;
383 this.subtrees = subtrees;
386 void close() throws DOMDataTreeProducerException {
388 subtrees.forEach(idToProducer::remove);
392 private static class ShardFrontendRegistration extends
393 AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
395 private final ActorRef clientActor;
396 private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
398 ShardFrontendRegistration(final ActorRef clientActor,
399 final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
400 super(shardRegistration);
401 this.clientActor = clientActor;
402 this.shardRegistration = shardRegistration;
406 protected void removeRegistration() {
407 shardRegistration.close();
408 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
413 * Handles the lookup step of cds shard creation once the configuration is updated.
415 private static class ShardCreationLookupTask extends LookupTask {
417 private final ActorSystem system;
418 private final ActorRef replyTo;
419 private final ClusterWrapper clusterWrapper;
420 private final ActorContext context;
421 private final DistributedShardedDOMDataTree shardingService;
422 private final DOMDataTreeIdentifier toLookup;
423 private final int lookupMaxRetries;
425 ShardCreationLookupTask(final ActorSystem system,
426 final ActorRef replyTo,
427 final ClusterWrapper clusterWrapper,
428 final ActorContext context,
429 final DistributedShardedDOMDataTree shardingService,
430 final DOMDataTreeIdentifier toLookup,
431 final int lookupMaxRetries) {
432 super(replyTo, lookupMaxRetries);
433 this.system = system;
434 this.replyTo = replyTo;
435 this.clusterWrapper = clusterWrapper;
436 this.context = context;
437 this.shardingService = shardingService;
438 this.toLookup = toLookup;
439 this.lookupMaxRetries = lookupMaxRetries;
444 final Future<ActorRef> localShardFuture =
445 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
447 localShardFuture.onComplete(new OnComplete<ActorRef>() {
449 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
450 if (throwable != null) {
451 tryReschedule(throwable);
453 LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
455 system.scheduler().scheduleOnce(
456 SHARD_LOOKUP_TASK_INTERVAL,
457 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
458 shardingService, toLookup, lookupMaxRetries),
459 system.dispatcher());
462 }, system.dispatcher());
466 void reschedule(int retries) {
467 LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
468 system.scheduler().scheduleOnce(
469 SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
474 * Handles the readiness step by waiting for a leader of the created shard.
476 private static class ShardLeaderLookupTask extends LookupTask {
478 private final ActorSystem system;
479 private final ActorRef replyTo;
480 private final ActorContext context;
481 private final ClusterWrapper clusterWrapper;
482 private final ActorRef shard;
483 private final DistributedShardedDOMDataTree shardingService;
484 private final DOMDataTreeIdentifier toLookup;
485 private final int lookupMaxRetries;
487 ShardLeaderLookupTask(final ActorSystem system,
488 final ActorRef replyTo,
489 final ActorContext context,
490 final ClusterWrapper clusterWrapper,
491 final ActorRef shard,
492 final DistributedShardedDOMDataTree shardingService,
493 final DOMDataTreeIdentifier toLookup,
494 final int lookupMaxRetries) {
495 super(replyTo, lookupMaxRetries);
496 this.system = system;
497 this.replyTo = replyTo;
498 this.context = context;
499 this.clusterWrapper = clusterWrapper;
501 this.shardingService = shardingService;
502 this.toLookup = toLookup;
503 this.lookupMaxRetries = lookupMaxRetries;
509 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
511 ask.onComplete(new OnComplete<Object>() {
513 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
514 if (throwable != null) {
515 tryReschedule(throwable);
517 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
518 final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
519 if (leaderActor.isPresent()) {
520 // leader is found, backend seems ready, check if the frontend is ready
521 LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
522 clusterWrapper.getCurrentMemberName(), toLookup);
523 system.scheduler().scheduleOnce(
524 SHARD_LOOKUP_TASK_INTERVAL,
525 new FrontendLookupTask(
526 system, replyTo, shardingService, toLookup, lookupMaxRetries),
527 system.dispatcher());
533 }, system.dispatcher());
538 void reschedule(int retries) {
539 LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
540 clusterWrapper.getCurrentMemberName(), toLookup, retries);
541 system.scheduler().scheduleOnce(
542 SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
547 * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
548 * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
549 * case they race), the future for the cds shard creation is completed and the shard is ready for use.
551 private static final class FrontendLookupTask extends LookupTask {
553 private final ActorSystem system;
554 private final ActorRef replyTo;
555 private final DistributedShardedDOMDataTree shardingService;
556 private final DOMDataTreeIdentifier toLookup;
558 FrontendLookupTask(final ActorSystem system,
559 final ActorRef replyTo,
560 final DistributedShardedDOMDataTree shardingService,
561 final DOMDataTreeIdentifier toLookup,
562 final int lookupMaxRetries) {
563 super(replyTo, lookupMaxRetries);
564 this.system = system;
565 this.replyTo = replyTo;
566 this.shardingService = shardingService;
567 this.toLookup = toLookup;
572 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
573 shardingService.lookupShardFrontend(toLookup);
575 if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
576 replyTo.tell(new Success(null), noSender());
582 private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
583 final DOMDataTreeIdentifier prefix) {
588 if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
592 if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
600 void reschedule(int retries) {
601 LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
602 system.scheduler().scheduleOnce(
603 SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
608 * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
611 private static class ShardRemovalLookupTask extends LookupTask {
613 private final ActorSystem system;
614 private final ActorRef replyTo;
615 private final ActorContext context;
616 private final DOMDataTreeIdentifier toLookup;
618 ShardRemovalLookupTask(final ActorSystem system,
619 final ActorRef replyTo,
620 final ActorContext context,
621 final DOMDataTreeIdentifier toLookup,
622 final int lookupMaxRetries) {
623 super(replyTo, lookupMaxRetries);
624 this.system = system;
625 this.replyTo = replyTo;
626 this.context = context;
627 this.toLookup = toLookup;
632 final Future<ActorRef> localShardFuture =
633 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
635 localShardFuture.onComplete(new OnComplete<ActorRef>() {
637 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
638 if (throwable != null) {
639 //TODO Shouldn't we check why findLocalShard failed?
640 LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
642 replyTo.tell(new Success(null), noSender());
647 }, system.dispatcher());
651 void reschedule(int retries) {
652 LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
654 system.scheduler().scheduleOnce(
655 SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
660 * Task for handling the lookup of the backend for the configuration shard.
662 private static class ConfigShardLookupTask extends LookupTask {
664 private final ActorSystem system;
665 private final ActorRef replyTo;
666 private final ActorContext context;
668 ConfigShardLookupTask(final ActorSystem system,
669 final ActorRef replyTo,
670 final ActorContext context,
671 final StartConfigShardLookup message,
672 final int lookupMaxRetries) {
673 super(replyTo, lookupMaxRetries);
674 this.system = system;
675 this.replyTo = replyTo;
676 this.context = context;
680 void reschedule(int retries) {
681 LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
682 system.scheduler().scheduleOnce(
683 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
688 final Optional<ActorRef> localShard =
689 context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
691 if (!localShard.isPresent()) {
694 LOG.debug("Local backend for prefix configuration shard lookup successful");
695 replyTo.tell(new Status.Success(null), noSender());
701 * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
703 private static class ConfigShardReadinessTask extends LookupTask {
705 private final ActorSystem system;
706 private final ActorRef replyTo;
707 private final ActorContext context;
708 private final ClusterWrapper clusterWrapper;
709 private final ActorRef shard;
711 ConfigShardReadinessTask(final ActorSystem system,
712 final ActorRef replyTo,
713 final ActorContext context,
714 final ClusterWrapper clusterWrapper,
715 final ActorRef shard,
716 final int lookupMaxRetries) {
717 super(replyTo, lookupMaxRetries);
718 this.system = system;
719 this.replyTo = replyTo;
720 this.context = context;
721 this.clusterWrapper = clusterWrapper;
726 void reschedule(int retries) {
727 LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
728 clusterWrapper.getCurrentMemberName(), retries);
729 system.scheduler().scheduleOnce(
730 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
735 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
737 ask.onComplete(new OnComplete<Object>() {
739 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
740 if (throwable != null) {
741 tryReschedule(throwable);
743 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
744 final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
745 if (leaderActor.isPresent()) {
746 // leader is found, backend seems ready, check if the frontend is ready
747 LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
748 clusterWrapper.getCurrentMemberName());
749 replyTo.tell(new Status.Success(null), noSender());
755 }, system.dispatcher());
759 public static class ShardedDataTreeActorCreator {
761 private DistributedShardedDOMDataTree shardingService;
762 private AbstractDataStore distributedConfigDatastore;
763 private AbstractDataStore distributedOperDatastore;
764 private ActorSystem actorSystem;
765 private ClusterWrapper cluster;
766 private int maxRetries;
768 public DistributedShardedDOMDataTree getShardingService() {
769 return shardingService;
772 public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
773 this.shardingService = shardingService;
777 public ActorSystem getActorSystem() {
781 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
782 this.actorSystem = actorSystem;
786 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
787 this.cluster = cluster;
791 public ClusterWrapper getClusterWrapper() {
795 public AbstractDataStore getDistributedConfigDatastore() {
796 return distributedConfigDatastore;
799 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
800 final AbstractDataStore distributedConfigDatastore) {
801 this.distributedConfigDatastore = distributedConfigDatastore;
805 public AbstractDataStore getDistributedOperDatastore() {
806 return distributedOperDatastore;
809 public ShardedDataTreeActorCreator setDistributedOperDatastore(
810 final AbstractDataStore distributedOperDatastore) {
811 this.distributedOperDatastore = distributedOperDatastore;
815 public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) {
816 this.maxRetries = maxRetries;
820 public int getLookupTaskMaxRetries() {
824 private void verify() {
825 Preconditions.checkNotNull(shardingService);
826 Preconditions.checkNotNull(actorSystem);
827 Preconditions.checkNotNull(cluster);
828 Preconditions.checkNotNull(distributedConfigDatastore);
829 Preconditions.checkNotNull(distributedOperDatastore);
832 public Props props() {
834 return Props.create(ShardedDataTreeActor.class, this);