2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.Status.Success;
20 import akka.cluster.ClusterEvent;
21 import akka.cluster.ClusterEvent.MemberExited;
22 import akka.cluster.ClusterEvent.MemberRemoved;
23 import akka.cluster.ClusterEvent.MemberUp;
24 import akka.cluster.ClusterEvent.MemberWeaklyUp;
25 import akka.cluster.ClusterEvent.ReachableMember;
26 import akka.cluster.ClusterEvent.UnreachableMember;
27 import akka.cluster.Member;
28 import akka.dispatch.OnComplete;
29 import akka.pattern.Patterns;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.base.Preconditions;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.concurrent.CompletableFuture;
40 import java.util.concurrent.TimeUnit;
41 import org.opendaylight.controller.cluster.access.concepts.MemberName;
42 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
43 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
44 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
45 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
46 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
47 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
48 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
49 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
50 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
51 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
52 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
53 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
54 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
55 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
56 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
57 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
58 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
59 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
64 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
65 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
66 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
67 import org.opendaylight.yangtools.concepts.ListenerRegistration;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71 import scala.compat.java8.FutureConverters;
72 import scala.concurrent.Future;
73 import scala.concurrent.duration.FiniteDuration;
76 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
77 * nodes of newly open producers/shards on the local node.
79 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
81 private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
83 private static final String PERSISTENCE_ID = "sharding-service-actor";
84 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
86 static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
88 private final DistributedShardedDOMDataTree shardingService;
89 private final ActorSystem actorSystem;
90 private final ClusterWrapper clusterWrapper;
91 // helper actorContext used only for static calls to executeAsync etc
92 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
93 private final ActorContext actorContext;
94 private final ShardingServiceAddressResolver resolver;
95 private final AbstractDataStore distributedConfigDatastore;
96 private final AbstractDataStore distributedOperDatastore;
97 private final int lookupTaskMaxRetries;
99 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
101 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
102 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
104 shardingService = builder.getShardingService();
105 actorSystem = builder.getActorSystem();
106 clusterWrapper = builder.getClusterWrapper();
107 distributedConfigDatastore = builder.getDistributedConfigDatastore();
108 distributedOperDatastore = builder.getDistributedOperDatastore();
109 lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
110 actorContext = distributedConfigDatastore.getActorContext();
111 resolver = new ShardingServiceAddressResolver(
112 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
114 clusterWrapper.subscribeToMemberEvents(self());
118 public void preStart() {
122 protected void handleRecover(final Object message) throws Exception {
123 LOG.debug("Received a recover message {}", message);
127 protected void handleCommand(final Object message) throws Exception {
128 LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
129 if (message instanceof ClusterEvent.MemberUp) {
130 memberUp((ClusterEvent.MemberUp) message);
131 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
132 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
133 } else if (message instanceof ClusterEvent.MemberExited) {
134 memberExited((ClusterEvent.MemberExited) message);
135 } else if (message instanceof ClusterEvent.MemberRemoved) {
136 memberRemoved((ClusterEvent.MemberRemoved) message);
137 } else if (message instanceof ClusterEvent.UnreachableMember) {
138 memberUnreachable((ClusterEvent.UnreachableMember) message);
139 } else if (message instanceof ClusterEvent.ReachableMember) {
140 memberReachable((ClusterEvent.ReachableMember) message);
141 } else if (message instanceof ProducerCreated) {
142 onProducerCreated((ProducerCreated) message);
143 } else if (message instanceof NotifyProducerCreated) {
144 onNotifyProducerCreated((NotifyProducerCreated) message);
145 } else if (message instanceof ProducerRemoved) {
146 onProducerRemoved((ProducerRemoved) message);
147 } else if (message instanceof NotifyProducerRemoved) {
148 onNotifyProducerRemoved((NotifyProducerRemoved) message);
149 } else if (message instanceof PrefixShardCreated) {
150 onPrefixShardCreated((PrefixShardCreated) message);
151 } else if (message instanceof LookupPrefixShard) {
152 onLookupPrefixShard((LookupPrefixShard) message);
153 } else if (message instanceof PrefixShardRemovalLookup) {
154 onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
155 } else if (message instanceof PrefixShardRemoved) {
156 onPrefixShardRemoved((PrefixShardRemoved) message);
157 } else if (message instanceof StartConfigShardLookup) {
158 onStartConfigShardLookup((StartConfigShardLookup) message);
163 public String persistenceId() {
164 return PERSISTENCE_ID;
167 private void memberUp(final MemberUp message) {
168 final MemberName memberName = memberToName(message.member());
170 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
171 message.member().address());
173 resolver.addPeerAddress(memberName, message.member().address());
176 private void memberWeaklyUp(final MemberWeaklyUp message) {
177 final MemberName memberName = memberToName(message.member());
179 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
180 message.member().address());
182 resolver.addPeerAddress(memberName, message.member().address());
185 private void memberExited(final MemberExited message) {
186 final MemberName memberName = memberToName(message.member());
188 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
189 message.member().address());
191 resolver.removePeerAddress(memberName);
194 private void memberRemoved(final MemberRemoved message) {
195 final MemberName memberName = memberToName(message.member());
197 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
198 message.member().address());
200 resolver.removePeerAddress(memberName);
203 private void memberUnreachable(final UnreachableMember message) {
204 final MemberName memberName = memberToName(message.member());
205 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
207 resolver.removePeerAddress(memberName);
210 private void memberReachable(final ReachableMember message) {
211 final MemberName memberName = memberToName(message.member());
212 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
214 resolver.addPeerAddress(memberName, message.member().address());
217 private void onProducerCreated(final ProducerCreated message) {
218 LOG.debug("Received ProducerCreated: {}", message);
220 // fastpath if we have no peers
221 if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
222 getSender().tell(new Status.Success(null), noSender());
225 final ActorRef sender = getSender();
226 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
228 final List<CompletableFuture<Object>> futures = new ArrayList<>();
230 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
231 final ActorSelection actorSelection = actorSystem.actorSelection(address);
233 FutureConverters.toJava(
234 actorContext.executeOperationAsync(
235 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
236 .toCompletableFuture());
239 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
240 futures.toArray(new CompletableFuture[futures.size()]));
243 .thenRun(() -> sender.tell(new Success(null), noSender()))
244 .exceptionally(throwable -> {
245 sender.tell(new Status.Failure(throwable), self());
250 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
251 LOG.debug("Received NotifyProducerCreated: {}", message);
253 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
256 final ActorProducerRegistration registration =
257 new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
258 subtrees.forEach(id -> idToProducer.put(id, registration));
259 sender().tell(new Status.Success(null), self());
260 } catch (final IllegalArgumentException e) {
261 sender().tell(new Status.Failure(e), getSelf());
265 private void onProducerRemoved(final ProducerRemoved message) {
266 LOG.debug("Received ProducerRemoved: {}", message);
268 final List<CompletableFuture<Object>> futures = new ArrayList<>();
270 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
271 final ActorSelection selection = actorSystem.actorSelection(address);
273 futures.add(FutureConverters.toJava(
274 actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
275 .toCompletableFuture());
278 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
279 futures.toArray(new CompletableFuture[futures.size()]));
281 final ActorRef respondTo = getSender();
284 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
285 .exceptionally(e -> {
286 respondTo.tell(new Status.Failure(null), self());
292 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
293 LOG.debug("Received NotifyProducerRemoved: {}", message);
295 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
296 if (registration == null) {
297 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
298 getSender().tell(new Status.Success(null), noSender());
303 registration.close();
304 getSender().tell(new Status.Success(null), noSender());
305 } catch (final DOMDataTreeProducerException e) {
306 LOG.error("Unable to close producer", e);
307 getSender().tell(new Status.Failure(e), noSender());
311 @SuppressWarnings("checkstyle:IllegalCatch")
312 private void onLookupPrefixShard(final LookupPrefixShard message) {
313 LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
315 final DOMDataTreeIdentifier prefix = message.getPrefix();
317 final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
318 ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
320 // schedule a notification task for the reply
321 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
322 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
323 context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
326 private void onPrefixShardCreated(final PrefixShardCreated message) {
327 LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
329 final PrefixShardConfiguration config = message.getConfiguration();
331 shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
334 private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
335 LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
337 final ShardRemovalLookupTask removalTask =
338 new ShardRemovalLookupTask(actorSystem, getSender(),
339 actorContext, message.getPrefix(), lookupTaskMaxRetries);
341 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
344 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
345 LOG.debug("Received PrefixShardRemoved: {}", message);
347 shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
350 private void onStartConfigShardLookup(final StartConfigShardLookup message) {
351 LOG.debug("Received StartConfigShardLookup: {}", message);
353 final ActorContext context =
354 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
355 ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
357 // schedule a notification task for the reply
358 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
359 new ConfigShardLookupTask(
360 actorSystem, getSender(), context, message, lookupTaskMaxRetries),
361 actorSystem.dispatcher());
364 private static MemberName memberToName(final Member member) {
365 return MemberName.forName(member.roles().iterator().next());
368 private class ActorProducerRegistration {
370 private final DOMDataTreeProducer producer;
371 private final Collection<DOMDataTreeIdentifier> subtrees;
373 ActorProducerRegistration(final DOMDataTreeProducer producer,
374 final Collection<DOMDataTreeIdentifier> subtrees) {
375 this.producer = producer;
376 this.subtrees = subtrees;
379 void close() throws DOMDataTreeProducerException {
381 subtrees.forEach(idToProducer::remove);
385 private static class ShardFrontendRegistration extends
386 AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
388 private final ActorRef clientActor;
389 private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
391 ShardFrontendRegistration(final ActorRef clientActor,
392 final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
393 super(shardRegistration);
394 this.clientActor = clientActor;
395 this.shardRegistration = shardRegistration;
399 protected void removeRegistration() {
400 shardRegistration.close();
401 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
406 * Handles the lookup step of cds shard creation once the configuration is updated.
408 private static class ShardCreationLookupTask extends LookupTask {
410 private final ActorSystem system;
411 private final ActorRef replyTo;
412 private final ClusterWrapper clusterWrapper;
413 private final ActorContext context;
414 private final DistributedShardedDOMDataTree shardingService;
415 private final DOMDataTreeIdentifier toLookup;
416 private final int lookupMaxRetries;
418 ShardCreationLookupTask(final ActorSystem system,
419 final ActorRef replyTo,
420 final ClusterWrapper clusterWrapper,
421 final ActorContext context,
422 final DistributedShardedDOMDataTree shardingService,
423 final DOMDataTreeIdentifier toLookup,
424 final int lookupMaxRetries) {
425 super(replyTo, lookupMaxRetries);
426 this.system = system;
427 this.replyTo = replyTo;
428 this.clusterWrapper = clusterWrapper;
429 this.context = context;
430 this.shardingService = shardingService;
431 this.toLookup = toLookup;
432 this.lookupMaxRetries = lookupMaxRetries;
437 final Future<ActorRef> localShardFuture =
438 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
440 localShardFuture.onComplete(new OnComplete<ActorRef>() {
442 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
443 if (throwable != null) {
444 tryReschedule(throwable);
446 LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
448 system.scheduler().scheduleOnce(
449 SHARD_LOOKUP_TASK_INTERVAL,
450 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
451 shardingService, toLookup, lookupMaxRetries),
452 system.dispatcher());
455 }, system.dispatcher());
459 void reschedule(int retries) {
460 LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
461 system.scheduler().scheduleOnce(
462 SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
467 * Handles the readiness step by waiting for a leader of the created shard.
469 private static class ShardLeaderLookupTask extends LookupTask {
471 private final ActorSystem system;
472 private final ActorRef replyTo;
473 private final ActorContext context;
474 private final ClusterWrapper clusterWrapper;
475 private final ActorRef shard;
476 private final DistributedShardedDOMDataTree shardingService;
477 private final DOMDataTreeIdentifier toLookup;
478 private final int lookupMaxRetries;
480 ShardLeaderLookupTask(final ActorSystem system,
481 final ActorRef replyTo,
482 final ActorContext context,
483 final ClusterWrapper clusterWrapper,
484 final ActorRef shard,
485 final DistributedShardedDOMDataTree shardingService,
486 final DOMDataTreeIdentifier toLookup,
487 final int lookupMaxRetries) {
488 super(replyTo, lookupMaxRetries);
489 this.system = system;
490 this.replyTo = replyTo;
491 this.context = context;
492 this.clusterWrapper = clusterWrapper;
494 this.shardingService = shardingService;
495 this.toLookup = toLookup;
496 this.lookupMaxRetries = lookupMaxRetries;
502 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
504 ask.onComplete(new OnComplete<Object>() {
506 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
507 if (throwable != null) {
508 tryReschedule(throwable);
510 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
511 final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
512 if (leaderActor.isPresent()) {
513 // leader is found, backend seems ready, check if the frontend is ready
514 LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
515 clusterWrapper.getCurrentMemberName(), toLookup);
516 system.scheduler().scheduleOnce(
517 SHARD_LOOKUP_TASK_INTERVAL,
518 new FrontendLookupTask(
519 system, replyTo, shardingService, toLookup, lookupMaxRetries),
520 system.dispatcher());
526 }, system.dispatcher());
531 void reschedule(int retries) {
532 LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
533 clusterWrapper.getCurrentMemberName(), toLookup, retries);
534 system.scheduler().scheduleOnce(
535 SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
540 * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
541 * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
542 * case they race), the future for the cds shard creation is completed and the shard is ready for use.
544 private static final class FrontendLookupTask extends LookupTask {
546 private final ActorSystem system;
547 private final ActorRef replyTo;
548 private final DistributedShardedDOMDataTree shardingService;
549 private final DOMDataTreeIdentifier toLookup;
551 FrontendLookupTask(final ActorSystem system,
552 final ActorRef replyTo,
553 final DistributedShardedDOMDataTree shardingService,
554 final DOMDataTreeIdentifier toLookup,
555 final int lookupMaxRetries) {
556 super(replyTo, lookupMaxRetries);
557 this.system = system;
558 this.replyTo = replyTo;
559 this.shardingService = shardingService;
560 this.toLookup = toLookup;
565 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
566 shardingService.lookupShardFrontend(toLookup);
568 if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
569 replyTo.tell(new Success(null), noSender());
575 private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
576 final DOMDataTreeIdentifier prefix) {
581 if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
585 if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
593 void reschedule(int retries) {
594 LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
595 system.scheduler().scheduleOnce(
596 SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
601 * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
604 private static class ShardRemovalLookupTask extends LookupTask {
606 private final ActorSystem system;
607 private final ActorRef replyTo;
608 private final ActorContext context;
609 private final DOMDataTreeIdentifier toLookup;
611 ShardRemovalLookupTask(final ActorSystem system,
612 final ActorRef replyTo,
613 final ActorContext context,
614 final DOMDataTreeIdentifier toLookup,
615 final int lookupMaxRetries) {
616 super(replyTo, lookupMaxRetries);
617 this.system = system;
618 this.replyTo = replyTo;
619 this.context = context;
620 this.toLookup = toLookup;
625 final Future<ActorRef> localShardFuture =
626 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
628 localShardFuture.onComplete(new OnComplete<ActorRef>() {
630 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
631 if (throwable != null) {
632 //TODO Shouldn't we check why findLocalShard failed?
633 LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
635 replyTo.tell(new Success(null), noSender());
640 }, system.dispatcher());
644 void reschedule(int retries) {
645 LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
647 system.scheduler().scheduleOnce(
648 SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
653 * Task for handling the lookup of the backend for the configuration shard.
655 private static class ConfigShardLookupTask extends LookupTask {
657 private final ActorSystem system;
658 private final ActorRef replyTo;
659 private final ActorContext context;
661 ConfigShardLookupTask(final ActorSystem system,
662 final ActorRef replyTo,
663 final ActorContext context,
664 final StartConfigShardLookup message,
665 final int lookupMaxRetries) {
666 super(replyTo, lookupMaxRetries);
667 this.system = system;
668 this.replyTo = replyTo;
669 this.context = context;
673 void reschedule(int retries) {
674 LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
675 system.scheduler().scheduleOnce(
676 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
681 final Optional<ActorRef> localShard =
682 context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
684 if (!localShard.isPresent()) {
687 LOG.debug("Local backend for prefix configuration shard lookup successful");
688 replyTo.tell(new Status.Success(null), noSender());
694 * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
696 private static class ConfigShardReadinessTask extends LookupTask {
698 private final ActorSystem system;
699 private final ActorRef replyTo;
700 private final ActorContext context;
701 private final ClusterWrapper clusterWrapper;
702 private final ActorRef shard;
704 ConfigShardReadinessTask(final ActorSystem system,
705 final ActorRef replyTo,
706 final ActorContext context,
707 final ClusterWrapper clusterWrapper,
708 final ActorRef shard,
709 final int lookupMaxRetries) {
710 super(replyTo, lookupMaxRetries);
711 this.system = system;
712 this.replyTo = replyTo;
713 this.context = context;
714 this.clusterWrapper = clusterWrapper;
719 void reschedule(int retries) {
720 LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
721 clusterWrapper.getCurrentMemberName(), retries);
722 system.scheduler().scheduleOnce(
723 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
728 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
730 ask.onComplete(new OnComplete<Object>() {
732 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
733 if (throwable != null) {
734 tryReschedule(throwable);
736 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
737 final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
738 if (leaderActor.isPresent()) {
739 // leader is found, backend seems ready, check if the frontend is ready
740 LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
741 clusterWrapper.getCurrentMemberName());
742 replyTo.tell(new Status.Success(null), noSender());
748 }, system.dispatcher());
752 public static class ShardedDataTreeActorCreator {
754 private DistributedShardedDOMDataTree shardingService;
755 private AbstractDataStore distributedConfigDatastore;
756 private AbstractDataStore distributedOperDatastore;
757 private ActorSystem actorSystem;
758 private ClusterWrapper cluster;
759 private int maxRetries;
761 public DistributedShardedDOMDataTree getShardingService() {
762 return shardingService;
765 public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
766 this.shardingService = shardingService;
770 public ActorSystem getActorSystem() {
774 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
775 this.actorSystem = actorSystem;
779 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
780 this.cluster = clusterWrapper;
784 public ClusterWrapper getClusterWrapper() {
788 public AbstractDataStore getDistributedConfigDatastore() {
789 return distributedConfigDatastore;
792 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
793 final AbstractDataStore distributedConfigDatastore) {
794 this.distributedConfigDatastore = distributedConfigDatastore;
798 public AbstractDataStore getDistributedOperDatastore() {
799 return distributedOperDatastore;
802 public ShardedDataTreeActorCreator setDistributedOperDatastore(
803 final AbstractDataStore distributedOperDatastore) {
804 this.distributedOperDatastore = distributedOperDatastore;
808 public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
809 this.maxRetries = newMaxRetries;
813 public int getLookupTaskMaxRetries() {
817 private void verify() {
818 Preconditions.checkNotNull(shardingService);
819 Preconditions.checkNotNull(actorSystem);
820 Preconditions.checkNotNull(cluster);
821 Preconditions.checkNotNull(distributedConfigDatastore);
822 Preconditions.checkNotNull(distributedOperDatastore);
825 public Props props() {
827 return Props.create(ShardedDataTreeActor.class, this);