2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.sharding;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.ActorSystem;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.actor.Status;
16 import akka.actor.Status.Success;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.ClusterEvent.MemberExited;
19 import akka.cluster.ClusterEvent.MemberRemoved;
20 import akka.cluster.ClusterEvent.MemberUp;
21 import akka.cluster.ClusterEvent.MemberWeaklyUp;
22 import akka.cluster.ClusterEvent.ReachableMember;
23 import akka.cluster.ClusterEvent.UnreachableMember;
24 import akka.cluster.Member;
25 import akka.dispatch.OnComplete;
26 import akka.pattern.Patterns;
27 import akka.util.Timeout;
28 import com.google.common.base.Preconditions;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
35 import java.util.Optional;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.TimeUnit;
38 import org.opendaylight.controller.cluster.access.concepts.MemberName;
39 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
40 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
41 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
42 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
43 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
44 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
45 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
46 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
47 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
48 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
49 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
50 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
51 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
52 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
53 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
54 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
55 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
56 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
61 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
62 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
63 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68 import scala.compat.java8.FutureConverters;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
73 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
74 * nodes of newly open producers/shards on the local node.
76 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
78 private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
80 private static final String PERSISTENCE_ID = "sharding-service-actor";
81 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
83 static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
85 private final DistributedShardedDOMDataTree shardingService;
86 private final ActorSystem actorSystem;
87 private final ClusterWrapper clusterWrapper;
88 // helper actorContext used only for static calls to executeAsync etc
89 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
90 private final ActorUtils actorUtils;
91 private final ShardingServiceAddressResolver resolver;
92 private final AbstractDataStore distributedConfigDatastore;
93 private final AbstractDataStore distributedOperDatastore;
94 private final int lookupTaskMaxRetries;
96 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
98 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
99 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
101 shardingService = builder.getShardingService();
102 actorSystem = builder.getActorSystem();
103 clusterWrapper = builder.getClusterWrapper();
104 distributedConfigDatastore = builder.getDistributedConfigDatastore();
105 distributedOperDatastore = builder.getDistributedOperDatastore();
106 lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
107 actorUtils = distributedConfigDatastore.getActorUtils();
108 resolver = new ShardingServiceAddressResolver(
109 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
111 clusterWrapper.subscribeToMemberEvents(self());
115 public void preStart() {
119 protected void handleRecover(final Object message) {
120 LOG.debug("Received a recover message {}", message);
124 protected void handleCommand(final Object message) {
125 LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
126 if (message instanceof ClusterEvent.MemberUp) {
127 memberUp((ClusterEvent.MemberUp) message);
128 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
129 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
130 } else if (message instanceof ClusterEvent.MemberExited) {
131 memberExited((ClusterEvent.MemberExited) message);
132 } else if (message instanceof ClusterEvent.MemberRemoved) {
133 memberRemoved((ClusterEvent.MemberRemoved) message);
134 } else if (message instanceof ClusterEvent.UnreachableMember) {
135 memberUnreachable((ClusterEvent.UnreachableMember) message);
136 } else if (message instanceof ClusterEvent.ReachableMember) {
137 memberReachable((ClusterEvent.ReachableMember) message);
138 } else if (message instanceof ProducerCreated) {
139 onProducerCreated((ProducerCreated) message);
140 } else if (message instanceof NotifyProducerCreated) {
141 onNotifyProducerCreated((NotifyProducerCreated) message);
142 } else if (message instanceof ProducerRemoved) {
143 onProducerRemoved((ProducerRemoved) message);
144 } else if (message instanceof NotifyProducerRemoved) {
145 onNotifyProducerRemoved((NotifyProducerRemoved) message);
146 } else if (message instanceof PrefixShardCreated) {
147 onPrefixShardCreated((PrefixShardCreated) message);
148 } else if (message instanceof LookupPrefixShard) {
149 onLookupPrefixShard((LookupPrefixShard) message);
150 } else if (message instanceof PrefixShardRemovalLookup) {
151 onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
152 } else if (message instanceof PrefixShardRemoved) {
153 onPrefixShardRemoved((PrefixShardRemoved) message);
154 } else if (message instanceof StartConfigShardLookup) {
155 onStartConfigShardLookup((StartConfigShardLookup) message);
160 public String persistenceId() {
161 return PERSISTENCE_ID;
164 private void memberUp(final MemberUp message) {
165 final MemberName memberName = memberToName(message.member());
167 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
168 message.member().address());
170 resolver.addPeerAddress(memberName, message.member().address());
173 private void memberWeaklyUp(final MemberWeaklyUp message) {
174 final MemberName memberName = memberToName(message.member());
176 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
177 message.member().address());
179 resolver.addPeerAddress(memberName, message.member().address());
182 private void memberExited(final MemberExited message) {
183 final MemberName memberName = memberToName(message.member());
185 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
186 message.member().address());
188 resolver.removePeerAddress(memberName);
191 private void memberRemoved(final MemberRemoved message) {
192 final MemberName memberName = memberToName(message.member());
194 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
195 message.member().address());
197 resolver.removePeerAddress(memberName);
200 private void memberUnreachable(final UnreachableMember message) {
201 final MemberName memberName = memberToName(message.member());
202 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
204 resolver.removePeerAddress(memberName);
207 private void memberReachable(final ReachableMember message) {
208 final MemberName memberName = memberToName(message.member());
209 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
211 resolver.addPeerAddress(memberName, message.member().address());
214 private void onProducerCreated(final ProducerCreated message) {
215 LOG.debug("Received ProducerCreated: {}", message);
217 // fastpath if we have no peers
218 if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
219 getSender().tell(new Status.Success(null), ActorRef.noSender());
222 final ActorRef sender = getSender();
223 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
225 final List<CompletableFuture<Object>> futures = new ArrayList<>();
227 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
228 final ActorSelection actorSelection = actorSystem.actorSelection(address);
230 FutureConverters.toJava(
231 actorUtils.executeOperationAsync(
232 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
233 .toCompletableFuture());
236 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
237 futures.toArray(new CompletableFuture[futures.size()]));
240 .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender()))
241 .exceptionally(throwable -> {
242 sender.tell(new Status.Failure(throwable), self());
247 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
248 LOG.debug("Received NotifyProducerCreated: {}", message);
250 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
253 final ActorProducerRegistration registration =
254 new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
255 subtrees.forEach(id -> idToProducer.put(id, registration));
256 sender().tell(new Status.Success(null), self());
257 } catch (final IllegalArgumentException e) {
258 sender().tell(new Status.Failure(e), getSelf());
262 private void onProducerRemoved(final ProducerRemoved message) {
263 LOG.debug("Received ProducerRemoved: {}", message);
265 final List<CompletableFuture<Object>> futures = new ArrayList<>();
267 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
268 final ActorSelection selection = actorSystem.actorSelection(address);
270 futures.add(FutureConverters.toJava(
271 actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
272 .toCompletableFuture());
275 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
276 futures.toArray(new CompletableFuture[futures.size()]));
278 final ActorRef respondTo = getSender();
281 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
282 .exceptionally(e -> {
283 respondTo.tell(new Status.Failure(null), self());
289 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
290 LOG.debug("Received NotifyProducerRemoved: {}", message);
292 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
293 if (registration == null) {
294 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
295 getSender().tell(new Status.Success(null), ActorRef.noSender());
300 registration.close();
301 getSender().tell(new Status.Success(null), ActorRef.noSender());
302 } catch (final DOMDataTreeProducerException e) {
303 LOG.error("Unable to close producer", e);
304 getSender().tell(new Status.Failure(e), ActorRef.noSender());
308 @SuppressWarnings("checkstyle:IllegalCatch")
309 private void onLookupPrefixShard(final LookupPrefixShard message) {
310 LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
312 final DOMDataTreeIdentifier prefix = message.getPrefix();
314 final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
315 ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
317 // schedule a notification task for the reply
318 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
319 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
320 utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
323 private void onPrefixShardCreated(final PrefixShardCreated message) {
324 LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
326 final PrefixShardConfiguration config = message.getConfiguration();
328 shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
331 private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
332 LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
334 final ShardRemovalLookupTask removalTask =
335 new ShardRemovalLookupTask(actorSystem, getSender(),
336 actorUtils, message.getPrefix(), lookupTaskMaxRetries);
338 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
341 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
342 LOG.debug("Received PrefixShardRemoved: {}", message);
344 shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
347 private void onStartConfigShardLookup(final StartConfigShardLookup message) {
348 LOG.debug("Received StartConfigShardLookup: {}", message);
350 final ActorUtils context =
351 message.getType().equals(LogicalDatastoreType.CONFIGURATION)
352 ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
354 // schedule a notification task for the reply
355 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
356 new ConfigShardLookupTask(
357 actorSystem, getSender(), context, message, lookupTaskMaxRetries),
358 actorSystem.dispatcher());
361 private static MemberName memberToName(final Member member) {
362 return MemberName.forName(member.roles().iterator().next());
365 private class ActorProducerRegistration {
367 private final DOMDataTreeProducer producer;
368 private final Collection<DOMDataTreeIdentifier> subtrees;
370 ActorProducerRegistration(final DOMDataTreeProducer producer,
371 final Collection<DOMDataTreeIdentifier> subtrees) {
372 this.producer = producer;
373 this.subtrees = subtrees;
376 void close() throws DOMDataTreeProducerException {
378 subtrees.forEach(idToProducer::remove);
382 private static class ShardFrontendRegistration extends
383 AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
385 private final ActorRef clientActor;
386 private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
388 ShardFrontendRegistration(final ActorRef clientActor,
389 final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
390 super(shardRegistration);
391 this.clientActor = clientActor;
392 this.shardRegistration = shardRegistration;
396 protected void removeRegistration() {
397 shardRegistration.close();
398 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
403 * Handles the lookup step of cds shard creation once the configuration is updated.
405 private static class ShardCreationLookupTask extends LookupTask {
407 private final ActorSystem system;
408 private final ActorRef replyTo;
409 private final ClusterWrapper clusterWrapper;
410 private final ActorUtils context;
411 private final DistributedShardedDOMDataTree shardingService;
412 private final DOMDataTreeIdentifier toLookup;
413 private final int lookupMaxRetries;
415 ShardCreationLookupTask(final ActorSystem system,
416 final ActorRef replyTo,
417 final ClusterWrapper clusterWrapper,
418 final ActorUtils context,
419 final DistributedShardedDOMDataTree shardingService,
420 final DOMDataTreeIdentifier toLookup,
421 final int lookupMaxRetries) {
422 super(replyTo, lookupMaxRetries);
423 this.system = system;
424 this.replyTo = replyTo;
425 this.clusterWrapper = clusterWrapper;
426 this.context = context;
427 this.shardingService = shardingService;
428 this.toLookup = toLookup;
429 this.lookupMaxRetries = lookupMaxRetries;
434 final Future<ActorRef> localShardFuture =
435 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
437 localShardFuture.onComplete(new OnComplete<ActorRef>() {
439 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
440 if (throwable != null) {
441 tryReschedule(throwable);
443 LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
445 system.scheduler().scheduleOnce(
446 SHARD_LOOKUP_TASK_INTERVAL,
447 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
448 shardingService, toLookup, lookupMaxRetries),
449 system.dispatcher());
452 }, system.dispatcher());
456 void reschedule(final int retries) {
457 LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
458 system.scheduler().scheduleOnce(
459 SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
464 * Handles the readiness step by waiting for a leader of the created shard.
466 private static class ShardLeaderLookupTask extends LookupTask {
468 private final ActorSystem system;
469 private final ActorRef replyTo;
470 private final ActorUtils context;
471 private final ClusterWrapper clusterWrapper;
472 private final ActorRef shard;
473 private final DistributedShardedDOMDataTree shardingService;
474 private final DOMDataTreeIdentifier toLookup;
475 private final int lookupMaxRetries;
477 ShardLeaderLookupTask(final ActorSystem system,
478 final ActorRef replyTo,
479 final ActorUtils context,
480 final ClusterWrapper clusterWrapper,
481 final ActorRef shard,
482 final DistributedShardedDOMDataTree shardingService,
483 final DOMDataTreeIdentifier toLookup,
484 final int lookupMaxRetries) {
485 super(replyTo, lookupMaxRetries);
486 this.system = system;
487 this.replyTo = replyTo;
488 this.context = context;
489 this.clusterWrapper = clusterWrapper;
491 this.shardingService = shardingService;
492 this.toLookup = toLookup;
493 this.lookupMaxRetries = lookupMaxRetries;
499 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
501 ask.onComplete(new OnComplete<Object>() {
503 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
504 if (throwable != null) {
505 tryReschedule(throwable);
507 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
508 final Optional<String> leaderActor = findLeader.getLeaderActor();
509 if (leaderActor.isPresent()) {
510 // leader is found, backend seems ready, check if the frontend is ready
511 LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
512 clusterWrapper.getCurrentMemberName(), toLookup);
513 system.scheduler().scheduleOnce(
514 SHARD_LOOKUP_TASK_INTERVAL,
515 new FrontendLookupTask(
516 system, replyTo, shardingService, toLookup, lookupMaxRetries),
517 system.dispatcher());
523 }, system.dispatcher());
528 void reschedule(final int retries) {
529 LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
530 clusterWrapper.getCurrentMemberName(), toLookup, retries);
531 system.scheduler().scheduleOnce(
532 SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
537 * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
538 * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
539 * case they race), the future for the cds shard creation is completed and the shard is ready for use.
541 private static final class FrontendLookupTask extends LookupTask {
543 private final ActorSystem system;
544 private final ActorRef replyTo;
545 private final DistributedShardedDOMDataTree shardingService;
546 private final DOMDataTreeIdentifier toLookup;
548 FrontendLookupTask(final ActorSystem system,
549 final ActorRef replyTo,
550 final DistributedShardedDOMDataTree shardingService,
551 final DOMDataTreeIdentifier toLookup,
552 final int lookupMaxRetries) {
553 super(replyTo, lookupMaxRetries);
554 this.system = system;
555 this.replyTo = replyTo;
556 this.shardingService = shardingService;
557 this.toLookup = toLookup;
562 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
563 shardingService.lookupShardFrontend(toLookup);
565 if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
566 replyTo.tell(new Success(null), ActorRef.noSender());
572 private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
573 final DOMDataTreeIdentifier prefix) {
578 if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
582 if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
590 void reschedule(final int retries) {
591 LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
592 system.scheduler().scheduleOnce(
593 SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
598 * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
601 private static class ShardRemovalLookupTask extends LookupTask {
603 private final ActorSystem system;
604 private final ActorRef replyTo;
605 private final ActorUtils context;
606 private final DOMDataTreeIdentifier toLookup;
608 ShardRemovalLookupTask(final ActorSystem system,
609 final ActorRef replyTo,
610 final ActorUtils context,
611 final DOMDataTreeIdentifier toLookup,
612 final int lookupMaxRetries) {
613 super(replyTo, lookupMaxRetries);
614 this.system = system;
615 this.replyTo = replyTo;
616 this.context = context;
617 this.toLookup = toLookup;
622 final Future<ActorRef> localShardFuture =
623 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
625 localShardFuture.onComplete(new OnComplete<ActorRef>() {
627 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
628 if (throwable != null) {
629 //TODO Shouldn't we check why findLocalShard failed?
630 LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
632 replyTo.tell(new Success(null), ActorRef.noSender());
637 }, system.dispatcher());
641 void reschedule(final int retries) {
642 LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
644 system.scheduler().scheduleOnce(
645 SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
650 * Task for handling the lookup of the backend for the configuration shard.
652 private static class ConfigShardLookupTask extends LookupTask {
654 private final ActorSystem system;
655 private final ActorRef replyTo;
656 private final ActorUtils context;
658 ConfigShardLookupTask(final ActorSystem system,
659 final ActorRef replyTo,
660 final ActorUtils context,
661 final StartConfigShardLookup message,
662 final int lookupMaxRetries) {
663 super(replyTo, lookupMaxRetries);
664 this.system = system;
665 this.replyTo = replyTo;
666 this.context = context;
670 void reschedule(final int retries) {
671 LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
672 system.scheduler().scheduleOnce(
673 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
678 final Optional<ActorRef> localShard =
679 context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
681 if (!localShard.isPresent()) {
684 LOG.debug("Local backend for prefix configuration shard lookup successful");
685 replyTo.tell(new Status.Success(null), ActorRef.noSender());
691 * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
693 private static class ConfigShardReadinessTask extends LookupTask {
695 private final ActorSystem system;
696 private final ActorRef replyTo;
697 private final ActorUtils context;
698 private final ClusterWrapper clusterWrapper;
699 private final ActorRef shard;
701 ConfigShardReadinessTask(final ActorSystem system,
702 final ActorRef replyTo,
703 final ActorUtils context,
704 final ClusterWrapper clusterWrapper,
705 final ActorRef shard,
706 final int lookupMaxRetries) {
707 super(replyTo, lookupMaxRetries);
708 this.system = system;
709 this.replyTo = replyTo;
710 this.context = context;
711 this.clusterWrapper = clusterWrapper;
716 void reschedule(final int retries) {
717 LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
718 clusterWrapper.getCurrentMemberName(), retries);
719 system.scheduler().scheduleOnce(
720 SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
725 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
727 ask.onComplete(new OnComplete<Object>() {
729 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
730 if (throwable != null) {
731 tryReschedule(throwable);
733 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
734 final Optional<String> leaderActor = findLeader.getLeaderActor();
735 if (leaderActor.isPresent()) {
736 // leader is found, backend seems ready, check if the frontend is ready
737 LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
738 clusterWrapper.getCurrentMemberName());
739 replyTo.tell(new Status.Success(null), ActorRef.noSender());
745 }, system.dispatcher());
749 public static class ShardedDataTreeActorCreator {
751 private DistributedShardedDOMDataTree shardingService;
752 private AbstractDataStore distributedConfigDatastore;
753 private AbstractDataStore distributedOperDatastore;
754 private ActorSystem actorSystem;
755 private ClusterWrapper cluster;
756 private int maxRetries;
758 public DistributedShardedDOMDataTree getShardingService() {
759 return shardingService;
762 public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
763 this.shardingService = shardingService;
767 public ActorSystem getActorSystem() {
771 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
772 this.actorSystem = actorSystem;
776 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
777 this.cluster = clusterWrapper;
781 public ClusterWrapper getClusterWrapper() {
785 public AbstractDataStore getDistributedConfigDatastore() {
786 return distributedConfigDatastore;
789 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
790 final AbstractDataStore distributedConfigDatastore) {
791 this.distributedConfigDatastore = distributedConfigDatastore;
795 public AbstractDataStore getDistributedOperDatastore() {
796 return distributedOperDatastore;
799 public ShardedDataTreeActorCreator setDistributedOperDatastore(
800 final AbstractDataStore distributedOperDatastore) {
801 this.distributedOperDatastore = distributedOperDatastore;
805 public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
806 this.maxRetries = newMaxRetries;
810 public int getLookupTaskMaxRetries() {
814 private void verify() {
815 Preconditions.checkNotNull(shardingService);
816 Preconditions.checkNotNull(actorSystem);
817 Preconditions.checkNotNull(cluster);
818 Preconditions.checkNotNull(distributedConfigDatastore);
819 Preconditions.checkNotNull(distributedOperDatastore);
822 public Props props() {
824 return Props.create(ShardedDataTreeActor.class, this);