2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.Status.Failure;
20 import akka.actor.Status.Success;
21 import akka.cluster.Cluster;
22 import akka.cluster.ClusterEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.MemberWeaklyUp;
27 import akka.cluster.ClusterEvent.ReachableMember;
28 import akka.cluster.ClusterEvent.UnreachableMember;
29 import akka.cluster.Member;
30 import akka.cluster.ddata.DistributedData;
31 import akka.cluster.ddata.ORMap;
32 import akka.cluster.ddata.Replicator;
33 import akka.cluster.ddata.Replicator.Changed;
34 import akka.cluster.ddata.Replicator.Subscribe;
35 import akka.cluster.ddata.Replicator.Update;
36 import akka.dispatch.OnComplete;
37 import akka.pattern.Patterns;
38 import akka.util.Timeout;
39 import com.google.common.base.Optional;
40 import com.google.common.base.Preconditions;
41 import com.google.common.collect.Sets;
42 import com.google.common.collect.Sets.SetView;
43 import java.util.ArrayList;
44 import java.util.Collection;
45 import java.util.HashMap;
46 import java.util.List;
48 import java.util.concurrent.CompletableFuture;
49 import java.util.concurrent.TimeUnit;
50 import java.util.function.Function;
51 import java.util.stream.Collectors;
52 import javax.annotation.Nullable;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
55 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
56 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
57 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
58 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
59 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
60 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
61 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
62 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
63 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
64 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
65 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
66 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
67 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
68 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
69 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
74 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
75 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
76 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
77 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
78 import org.opendaylight.yangtools.concepts.ListenerRegistration;
79 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82 import scala.compat.java8.FutureConverters;
83 import scala.concurrent.Future;
84 import scala.concurrent.duration.FiniteDuration;
87 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
88 * nodes of newly open producers/shards on the local node.
90 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
92 private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
94 private static final String PERSISTENCE_ID = "sharding-service-actor";
95 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
97 static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
98 static final int LOOKUP_TASK_MAX_RETRIES = 100;
100 private final DistributedShardedDOMDataTree shardingService;
101 private final ActorSystem actorSystem;
102 private final ClusterWrapper clusterWrapper;
103 // helper actorContext used only for static calls to executeAsync etc
104 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
105 private final ActorContext actorContext;
106 private final ShardingServiceAddressResolver resolver;
107 private final DistributedDataStore distributedConfigDatastore;
108 private final DistributedDataStore distributedOperDatastore;
110 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
111 private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
113 private final Cluster cluster;
114 private final ActorRef replicator;
116 private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
117 private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
119 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
120 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
122 shardingService = builder.getShardingService();
123 actorSystem = builder.getActorSystem();
124 clusterWrapper = builder.getClusterWrapper();
125 distributedConfigDatastore = builder.getDistributedConfigDatastore();
126 distributedOperDatastore = builder.getDistributedOperDatastore();
127 actorContext = distributedConfigDatastore.getActorContext();
128 resolver = new ShardingServiceAddressResolver(
129 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
131 clusterWrapper.subscribeToMemberEvents(self());
132 cluster = Cluster.get(actorSystem);
134 replicator = DistributedData.get(context().system()).replicator();
138 public void preStart() {
139 final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
140 new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
141 replicator.tell(subscribe, noSender());
145 protected void handleRecover(final Object message) throws Exception {
146 LOG.debug("Received a recover message {}", message);
150 protected void handleCommand(final Object message) throws Exception {
151 LOG.debug("Received {}", message);
152 if (message instanceof ClusterEvent.MemberUp) {
153 memberUp((ClusterEvent.MemberUp) message);
154 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
155 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
156 } else if (message instanceof ClusterEvent.MemberExited) {
157 memberExited((ClusterEvent.MemberExited) message);
158 } else if (message instanceof ClusterEvent.MemberRemoved) {
159 memberRemoved((ClusterEvent.MemberRemoved) message);
160 } else if (message instanceof ClusterEvent.UnreachableMember) {
161 memberUnreachable((ClusterEvent.UnreachableMember) message);
162 } else if (message instanceof ClusterEvent.ReachableMember) {
163 memberReachable((ClusterEvent.ReachableMember) message);
164 } else if (message instanceof Changed) {
165 onConfigChanged((Changed) message);
166 } else if (message instanceof ProducerCreated) {
167 onProducerCreated((ProducerCreated) message);
168 } else if (message instanceof NotifyProducerCreated) {
169 onNotifyProducerCreated((NotifyProducerCreated) message);
170 } else if (message instanceof ProducerRemoved) {
171 onProducerRemoved((ProducerRemoved) message);
172 } else if (message instanceof NotifyProducerRemoved) {
173 onNotifyProducerRemoved((NotifyProducerRemoved) message);
174 } else if (message instanceof PrefixShardCreated) {
175 onPrefixShardCreated((PrefixShardCreated) message);
176 } else if (message instanceof CreatePrefixShard) {
177 onCreatePrefixShard((CreatePrefixShard) message);
178 } else if (message instanceof RemovePrefixShard) {
179 onRemovePrefixShard((RemovePrefixShard) message);
180 } else if (message instanceof PrefixShardRemoved) {
181 onPrefixShardRemoved((PrefixShardRemoved) message);
185 private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
186 LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
188 currentData = change.dataValue();
189 final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
191 LOG.debug("Changed set {}", changedConfig);
194 final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
195 changedConfig.values().stream().collect(
196 Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
197 resolveConfig(newConfig);
198 } catch (final IllegalStateException e) {
199 LOG.error("Failed, ", e);
204 private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
206 // get the removed configurations
207 final SetView<DOMDataTreeIdentifier> deleted =
208 Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
209 shardingService.resolveShardRemovals(deleted);
211 // get the added configurations
212 final SetView<DOMDataTreeIdentifier> additions =
213 Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
214 shardingService.resolveShardAdditions(additions);
215 // we can ignore those that existed previously since the potential changes in replicas will be handled by
218 currentConfiguration = new HashMap<>(newConfig);
222 public String persistenceId() {
223 return PERSISTENCE_ID;
226 private void memberUp(final MemberUp message) {
227 final MemberName memberName = memberToName(message.member());
229 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
230 message.member().address());
232 resolver.addPeerAddress(memberName, message.member().address());
235 private void memberWeaklyUp(final MemberWeaklyUp message) {
236 final MemberName memberName = memberToName(message.member());
238 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
239 message.member().address());
241 resolver.addPeerAddress(memberName, message.member().address());
244 private void memberExited(final MemberExited message) {
245 final MemberName memberName = memberToName(message.member());
247 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
248 message.member().address());
250 resolver.removePeerAddress(memberName);
253 private void memberRemoved(final MemberRemoved message) {
254 final MemberName memberName = memberToName(message.member());
256 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
257 message.member().address());
259 resolver.removePeerAddress(memberName);
262 private void memberUnreachable(final UnreachableMember message) {
263 final MemberName memberName = memberToName(message.member());
264 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
266 resolver.removePeerAddress(memberName);
269 private void memberReachable(final ReachableMember message) {
270 final MemberName memberName = memberToName(message.member());
271 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
273 resolver.addPeerAddress(memberName, message.member().address());
276 private void onProducerCreated(final ProducerCreated message) {
277 LOG.debug("Received ProducerCreated: {}", message);
279 // fastpath if we have no peers
280 if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
281 getSender().tell(new Status.Success(null), noSender());
284 final ActorRef sender = getSender();
285 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
287 final List<CompletableFuture<Object>> futures = new ArrayList<>();
289 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
290 final ActorSelection actorSelection = actorSystem.actorSelection(address);
292 FutureConverters.toJava(
293 actorContext.executeOperationAsync(
294 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
295 .toCompletableFuture());
298 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
299 futures.toArray(new CompletableFuture[futures.size()]));
301 combinedFuture.thenRun(() -> {
302 sender.tell(new Status.Success(null), noSender());
303 }).exceptionally(throwable -> {
304 sender.tell(new Status.Failure(throwable), self());
309 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
310 LOG.debug("Received NotifyProducerCreated: {}", message);
312 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
315 final ActorProducerRegistration registration =
316 new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
317 subtrees.forEach(id -> idToProducer.put(id, registration));
318 sender().tell(new Status.Success(null), self());
319 } catch (final IllegalArgumentException e) {
320 sender().tell(new Status.Failure(e), getSelf());
324 private void onProducerRemoved(final ProducerRemoved message) {
325 LOG.debug("Received ProducerRemoved: {}", message);
327 final List<CompletableFuture<Object>> futures = new ArrayList<>();
329 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
330 final ActorSelection selection = actorSystem.actorSelection(address);
332 futures.add(FutureConverters.toJava(
333 actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
334 .toCompletableFuture());
337 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
338 futures.toArray(new CompletableFuture[futures.size()]));
340 final ActorRef respondTo = getSender();
343 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
344 .exceptionally(e -> {
345 respondTo.tell(new Status.Failure(null), self());
351 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
352 LOG.debug("Received NotifyProducerRemoved: {}", message);
354 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
355 if (registration == null) {
356 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
357 getSender().tell(new Status.Success(null), noSender());
362 registration.close();
363 getSender().tell(new Status.Success(null), noSender());
364 } catch (final DOMDataTreeProducerException e) {
365 LOG.error("Unable to close producer", e);
366 getSender().tell(new Status.Failure(e), noSender());
370 @SuppressWarnings("checkstyle:IllegalCatch")
371 private void onCreatePrefixShard(final CreatePrefixShard message) {
372 LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
374 final PrefixShardConfiguration configuration = message.getConfiguration();
376 final Update<ORMap<PrefixShardConfiguration>> update =
377 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
378 map -> map.put(cluster, configuration.toDataMapKey(), configuration));
380 replicator.tell(update, self());
382 final ActorContext context =
383 configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
384 ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
386 // schedule a notification task for the reply
387 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
388 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
389 context, shardingService, configuration.getPrefix()),
390 actorSystem.dispatcher());
393 private void onPrefixShardCreated(final PrefixShardCreated message) {
394 LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
396 final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
397 final ActorRef sender = getSender();
399 final List<CompletableFuture<Object>> futures = new ArrayList<>();
401 for (final String address : addresses) {
402 final ActorSelection actorSelection = actorSystem.actorSelection(address);
403 futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
404 new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
407 final CompletableFuture<Void> combinedFuture =
408 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
410 combinedFuture.thenRun(() -> {
411 sender.tell(new Status.Success(null), self());
412 }).exceptionally(throwable -> {
413 sender.tell(new Status.Failure(throwable), self());
418 private void onRemovePrefixShard(final RemovePrefixShard message) {
419 LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
421 //TODO the removal message should have the configuration or some other way to get to the key
422 final Update<ORMap<PrefixShardConfiguration>> removal =
423 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
424 map -> map.remove(cluster, "prefix=" + message.getPrefix()));
425 replicator.tell(removal, self());
427 final ShardRemovalLookupTask removalTask =
428 new ShardRemovalLookupTask(actorSystem, getSender(),
429 actorContext, message.getPrefix());
431 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
434 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
435 LOG.debug("Received PrefixShardRemoved: {}", message);
437 final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
439 if (registration == null) {
440 LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
441 message.getPrefix(), idToShardRegistration);
445 registration.close();
448 private static MemberName memberToName(final Member member) {
449 return MemberName.forName(member.roles().iterator().next());
452 private class ActorProducerRegistration {
454 private final DOMDataTreeProducer producer;
455 private final Collection<DOMDataTreeIdentifier> subtrees;
457 ActorProducerRegistration(final DOMDataTreeProducer producer,
458 final Collection<DOMDataTreeIdentifier> subtrees) {
459 this.producer = producer;
460 this.subtrees = subtrees;
463 void close() throws DOMDataTreeProducerException {
465 subtrees.forEach(idToProducer::remove);
469 private static class ShardFrontendRegistration extends
470 AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
472 private final ActorRef clientActor;
473 private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
475 ShardFrontendRegistration(final ActorRef clientActor,
476 final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
477 super(shardRegistration);
478 this.clientActor = clientActor;
479 this.shardRegistration = shardRegistration;
483 protected void removeRegistration() {
484 shardRegistration.close();
485 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
489 private abstract static class LookupTask implements Runnable {
491 private final ActorRef replyTo;
492 private int retries = 0;
494 private LookupTask(final ActorRef replyTo) {
495 this.replyTo = replyTo;
498 abstract void reschedule(int retries);
500 void tryReschedule(@Nullable final Throwable throwable) {
501 if (retries <= LOOKUP_TASK_MAX_RETRIES) {
509 void fail(@Nullable final Throwable throwable) {
510 if (throwable == null) {
511 replyTo.tell(new Failure(
512 new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
513 + "Failing..")), noSender());
515 replyTo.tell(new Failure(
516 new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
517 + "Failing..", throwable)), noSender());
523 * Handles the lookup step of cds shard creation once the configuration is updated.
525 private static class ShardCreationLookupTask extends LookupTask {
527 private final ActorSystem system;
528 private final ActorRef replyTo;
529 private final ClusterWrapper clusterWrapper;
530 private final ActorContext context;
531 private final DistributedShardedDOMDataTree shardingService;
532 private final DOMDataTreeIdentifier toLookup;
534 ShardCreationLookupTask(final ActorSystem system,
535 final ActorRef replyTo,
536 final ClusterWrapper clusterWrapper,
537 final ActorContext context,
538 final DistributedShardedDOMDataTree shardingService,
539 final DOMDataTreeIdentifier toLookup) {
541 this.system = system;
542 this.replyTo = replyTo;
543 this.clusterWrapper = clusterWrapper;
544 this.context = context;
545 this.shardingService = shardingService;
546 this.toLookup = toLookup;
551 final Future<ActorRef> localShardFuture =
552 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
554 localShardFuture.onComplete(new OnComplete<ActorRef>() {
556 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
557 if (throwable != null) {
558 tryReschedule(throwable);
560 LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
562 system.scheduler().scheduleOnce(
563 SHARD_LOOKUP_TASK_INTERVAL,
564 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
565 shardingService, toLookup),
566 system.dispatcher());
569 }, system.dispatcher());
573 void reschedule(int retries) {
574 LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
575 system.scheduler().scheduleOnce(
576 SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
581 * Handles the readiness step by waiting for a leader of the created shard.
583 private static class ShardLeaderLookupTask extends LookupTask {
585 private final ActorSystem system;
586 private final ActorRef replyTo;
587 private final ActorContext context;
588 private final ClusterWrapper clusterWrapper;
589 private final ActorRef shard;
590 private final DistributedShardedDOMDataTree shardingService;
591 private final DOMDataTreeIdentifier toLookup;
593 ShardLeaderLookupTask(final ActorSystem system,
594 final ActorRef replyTo,
595 final ActorContext context,
596 final ClusterWrapper clusterWrapper,
597 final ActorRef shard,
598 final DistributedShardedDOMDataTree shardingService,
599 final DOMDataTreeIdentifier toLookup) {
601 this.system = system;
602 this.replyTo = replyTo;
603 this.context = context;
604 this.clusterWrapper = clusterWrapper;
606 this.shardingService = shardingService;
607 this.toLookup = toLookup;
613 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
615 ask.onComplete(new OnComplete<Object>() {
617 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
618 if (throwable != null) {
619 tryReschedule(throwable);
621 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
622 final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
623 if (leaderActor.isPresent()) {
624 // leader is found, backend seems ready, check if the frontend is ready
625 LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
626 clusterWrapper.getCurrentMemberName(), toLookup);
627 system.scheduler().scheduleOnce(
628 SHARD_LOOKUP_TASK_INTERVAL,
629 new FrontendLookupTask(system, replyTo, shardingService, toLookup),
630 system.dispatcher());
636 }, system.dispatcher());
641 void reschedule(int retries) {
642 LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
643 clusterWrapper.getCurrentMemberName(), toLookup, retries);
644 system.scheduler().scheduleOnce(
645 SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
650 * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
651 * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
652 * case they race), the future for the cds shard creation is completed and the shard is ready for use.
654 private static final class FrontendLookupTask extends LookupTask {
656 private final ActorSystem system;
657 private final ActorRef replyTo;
658 private final DistributedShardedDOMDataTree shardingService;
659 private final DOMDataTreeIdentifier toLookup;
661 FrontendLookupTask(final ActorSystem system,
662 final ActorRef replyTo,
663 final DistributedShardedDOMDataTree shardingService,
664 final DOMDataTreeIdentifier toLookup) {
666 this.system = system;
667 this.replyTo = replyTo;
668 this.shardingService = shardingService;
669 this.toLookup = toLookup;
674 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
675 shardingService.lookupShardFrontend(toLookup);
677 if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
678 replyTo.tell(new Success(null), noSender());
684 private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
685 final DOMDataTreeIdentifier prefix) {
690 if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
694 if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
702 void reschedule(int retries) {
703 LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
704 system.scheduler().scheduleOnce(
705 SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
710 * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
713 private static class ShardRemovalLookupTask extends LookupTask {
715 private final ActorSystem system;
716 private final ActorRef replyTo;
717 private final ActorContext context;
718 private final DOMDataTreeIdentifier toLookup;
720 ShardRemovalLookupTask(final ActorSystem system,
721 final ActorRef replyTo,
722 final ActorContext context,
723 final DOMDataTreeIdentifier toLookup) {
725 this.system = system;
726 this.replyTo = replyTo;
727 this.context = context;
728 this.toLookup = toLookup;
733 final Future<ActorRef> localShardFuture =
734 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
736 localShardFuture.onComplete(new OnComplete<ActorRef>() {
738 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
739 if (throwable != null) {
740 //TODO Shouldn't we check why findLocalShard failed?
741 LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
743 replyTo.tell(new Success(null), noSender());
748 }, system.dispatcher());
752 void reschedule(int retries) {
753 LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
755 system.scheduler().scheduleOnce(
756 SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
760 public static class ShardedDataTreeActorCreator {
762 private DistributedShardedDOMDataTree shardingService;
763 private DistributedDataStore distributedConfigDatastore;
764 private DistributedDataStore distributedOperDatastore;
765 private ActorSystem actorSystem;
766 private ClusterWrapper cluster;
768 public DistributedShardedDOMDataTree getShardingService() {
769 return shardingService;
772 public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
773 this.shardingService = shardingService;
777 public ActorSystem getActorSystem() {
781 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
782 this.actorSystem = actorSystem;
786 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
787 this.cluster = cluster;
791 public ClusterWrapper getClusterWrapper() {
795 public DistributedDataStore getDistributedConfigDatastore() {
796 return distributedConfigDatastore;
799 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
800 final DistributedDataStore distributedConfigDatastore) {
801 this.distributedConfigDatastore = distributedConfigDatastore;
805 public DistributedDataStore getDistributedOperDatastore() {
806 return distributedOperDatastore;
809 public ShardedDataTreeActorCreator setDistributedOperDatastore(
810 final DistributedDataStore distributedOperDatastore) {
811 this.distributedOperDatastore = distributedOperDatastore;
815 private void verify() {
816 Preconditions.checkNotNull(shardingService);
817 Preconditions.checkNotNull(actorSystem);
818 Preconditions.checkNotNull(cluster);
819 Preconditions.checkNotNull(distributedConfigDatastore);
820 Preconditions.checkNotNull(distributedOperDatastore);
823 public Props props() {
825 return Props.create(ShardedDataTreeActor.class, this);