2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.Status.Failure;
20 import akka.actor.Status.Success;
21 import akka.cluster.Cluster;
22 import akka.cluster.ClusterEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.MemberWeaklyUp;
27 import akka.cluster.ClusterEvent.ReachableMember;
28 import akka.cluster.ClusterEvent.UnreachableMember;
29 import akka.cluster.Member;
30 import akka.cluster.ddata.DistributedData;
31 import akka.cluster.ddata.ORMap;
32 import akka.cluster.ddata.Replicator;
33 import akka.cluster.ddata.Replicator.Changed;
34 import akka.cluster.ddata.Replicator.Subscribe;
35 import akka.cluster.ddata.Replicator.Update;
36 import akka.dispatch.OnComplete;
37 import akka.pattern.Patterns;
38 import akka.util.Timeout;
39 import com.google.common.base.Optional;
40 import com.google.common.base.Preconditions;
41 import com.google.common.collect.Sets;
42 import com.google.common.collect.Sets.SetView;
43 import java.util.ArrayList;
44 import java.util.Collection;
45 import java.util.HashMap;
46 import java.util.List;
48 import java.util.concurrent.CompletableFuture;
49 import java.util.concurrent.TimeUnit;
50 import java.util.function.Function;
51 import java.util.stream.Collectors;
52 import javax.annotation.Nullable;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
55 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
56 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
57 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
58 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
59 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
60 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
61 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
62 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
63 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
64 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
65 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
66 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
67 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
68 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
69 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
74 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
75 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
76 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
77 import org.opendaylight.yangtools.concepts.ListenerRegistration;
78 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81 import scala.compat.java8.FutureConverters;
82 import scala.concurrent.Future;
83 import scala.concurrent.duration.FiniteDuration;
86 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
87 * nodes of newly open producers/shards on the local node.
89 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
91 private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
93 private static final String PERSISTENCE_ID = "sharding-service-actor";
94 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
96 static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
97 static final int LOOKUP_TASK_MAX_RETRIES = 100;
99 private final DistributedShardedDOMDataTree shardingService;
100 private final ActorSystem actorSystem;
101 private final ClusterWrapper clusterWrapper;
102 // helper actorContext used only for static calls to executeAsync etc
103 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
104 private final ActorContext actorContext;
105 private final ShardingServiceAddressResolver resolver;
106 private final DistributedDataStore distributedConfigDatastore;
107 private final DistributedDataStore distributedOperDatastore;
109 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
110 private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
112 private final Cluster cluster;
113 private final ActorRef replicator;
115 private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
116 private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
118 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
119 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
121 shardingService = builder.getShardingService();
122 actorSystem = builder.getActorSystem();
123 clusterWrapper = builder.getClusterWrapper();
124 distributedConfigDatastore = builder.getDistributedConfigDatastore();
125 distributedOperDatastore = builder.getDistributedOperDatastore();
126 actorContext = distributedConfigDatastore.getActorContext();
127 resolver = new ShardingServiceAddressResolver(
128 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
130 clusterWrapper.subscribeToMemberEvents(self());
131 cluster = Cluster.get(actorSystem);
133 replicator = DistributedData.get(context().system()).replicator();
137 public void preStart() {
138 final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
139 new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
140 replicator.tell(subscribe, noSender());
144 protected void handleRecover(final Object message) throws Exception {
145 LOG.debug("Received a recover message {}", message);
149 protected void handleCommand(final Object message) throws Exception {
150 LOG.debug("Received {}", message);
151 if (message instanceof ClusterEvent.MemberUp) {
152 memberUp((ClusterEvent.MemberUp) message);
153 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
154 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
155 } else if (message instanceof ClusterEvent.MemberExited) {
156 memberExited((ClusterEvent.MemberExited) message);
157 } else if (message instanceof ClusterEvent.MemberRemoved) {
158 memberRemoved((ClusterEvent.MemberRemoved) message);
159 } else if (message instanceof ClusterEvent.UnreachableMember) {
160 memberUnreachable((ClusterEvent.UnreachableMember) message);
161 } else if (message instanceof ClusterEvent.ReachableMember) {
162 memberReachable((ClusterEvent.ReachableMember) message);
163 } else if (message instanceof Changed) {
164 onConfigChanged((Changed) message);
165 } else if (message instanceof ProducerCreated) {
166 onProducerCreated((ProducerCreated) message);
167 } else if (message instanceof NotifyProducerCreated) {
168 onNotifyProducerCreated((NotifyProducerCreated) message);
169 } else if (message instanceof ProducerRemoved) {
170 onProducerRemoved((ProducerRemoved) message);
171 } else if (message instanceof NotifyProducerRemoved) {
172 onNotifyProducerRemoved((NotifyProducerRemoved) message);
173 } else if (message instanceof PrefixShardCreated) {
174 onPrefixShardCreated((PrefixShardCreated) message);
175 } else if (message instanceof CreatePrefixShard) {
176 onCreatePrefixShard((CreatePrefixShard) message);
177 } else if (message instanceof RemovePrefixShard) {
178 onRemovePrefixShard((RemovePrefixShard) message);
179 } else if (message instanceof PrefixShardRemoved) {
180 onPrefixShardRemoved((PrefixShardRemoved) message);
184 private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
185 LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
187 currentData = change.dataValue();
188 final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
190 LOG.debug("Changed set {}", changedConfig);
193 final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
194 changedConfig.values().stream().collect(
195 Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
196 resolveConfig(newConfig);
197 } catch (final IllegalStateException e) {
198 LOG.error("Failed, ", e);
203 private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
205 // get the removed configurations
206 final SetView<DOMDataTreeIdentifier> deleted =
207 Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
208 shardingService.resolveShardRemovals(deleted);
210 // get the added configurations
211 final SetView<DOMDataTreeIdentifier> additions =
212 Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
213 shardingService.resolveShardAdditions(additions);
214 // we can ignore those that existed previously since the potential changes in replicas will be handled by
217 currentConfiguration = new HashMap<>(newConfig);
221 public String persistenceId() {
222 return PERSISTENCE_ID;
225 private void memberUp(final MemberUp message) {
226 final MemberName memberName = memberToName(message.member());
228 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
229 message.member().address());
231 resolver.addPeerAddress(memberName, message.member().address());
234 private void memberWeaklyUp(final MemberWeaklyUp message) {
235 final MemberName memberName = memberToName(message.member());
237 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
238 message.member().address());
240 resolver.addPeerAddress(memberName, message.member().address());
243 private void memberExited(final MemberExited message) {
244 final MemberName memberName = memberToName(message.member());
246 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
247 message.member().address());
249 resolver.removePeerAddress(memberName);
252 private void memberRemoved(final MemberRemoved message) {
253 final MemberName memberName = memberToName(message.member());
255 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
256 message.member().address());
258 resolver.removePeerAddress(memberName);
261 private void memberUnreachable(final UnreachableMember message) {
262 final MemberName memberName = memberToName(message.member());
263 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
265 resolver.removePeerAddress(memberName);
268 private void memberReachable(final ReachableMember message) {
269 final MemberName memberName = memberToName(message.member());
270 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
272 resolver.addPeerAddress(memberName, message.member().address());
275 private void onProducerCreated(final ProducerCreated message) {
276 LOG.debug("Received ProducerCreated: {}", message);
278 // fastpath if no replication is needed, since there is only one node
279 if (resolver.getShardingServicePeerActorAddresses().size() == 1) {
280 getSender().tell(new Status.Success(null), noSender());
283 final ActorRef sender = getSender();
284 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
286 final List<CompletableFuture<Object>> futures = new ArrayList<>();
288 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
289 final ActorSelection actorSelection = actorSystem.actorSelection(address);
291 FutureConverters.toJava(
292 actorContext.executeOperationAsync(
293 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
294 .toCompletableFuture());
297 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
298 futures.toArray(new CompletableFuture[futures.size()]));
300 combinedFuture.thenRun(() -> {
301 sender.tell(new Status.Success(null), noSender());
302 }).exceptionally(throwable -> {
303 sender.tell(new Status.Failure(throwable), self());
308 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
309 LOG.debug("Received NotifyProducerCreated: {}", message);
311 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
314 final ActorProducerRegistration registration =
315 new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
316 subtrees.forEach(id -> idToProducer.put(id, registration));
317 sender().tell(new Status.Success(null), self());
318 } catch (final IllegalArgumentException e) {
319 sender().tell(new Status.Failure(e), getSelf());
323 private void onProducerRemoved(final ProducerRemoved message) {
324 LOG.debug("Received ProducerRemoved: {}", message);
326 final List<CompletableFuture<Object>> futures = new ArrayList<>();
328 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
329 final ActorSelection selection = actorSystem.actorSelection(address);
331 futures.add(FutureConverters.toJava(
332 actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
333 .toCompletableFuture());
336 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
337 futures.toArray(new CompletableFuture[futures.size()]));
339 final ActorRef respondTo = getSender();
342 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
343 .exceptionally(e -> {
344 respondTo.tell(new Status.Failure(null), self());
350 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
351 LOG.debug("Received NotifyProducerRemoved: {}", message);
353 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
354 if (registration == null) {
355 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
356 getSender().tell(new Status.Success(null), noSender());
361 registration.close();
362 getSender().tell(new Status.Success(null), noSender());
363 } catch (final DOMDataTreeProducerException e) {
364 LOG.error("Unable to close producer", e);
365 getSender().tell(new Status.Failure(e), noSender());
369 @SuppressWarnings("checkstyle:IllegalCatch")
370 private void onCreatePrefixShard(final CreatePrefixShard message) {
371 LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
373 final PrefixShardConfiguration configuration = message.getConfiguration();
375 final Update<ORMap<PrefixShardConfiguration>> update =
376 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
377 map -> map.put(cluster, configuration.toDataMapKey(), configuration));
379 replicator.tell(update, self());
381 // schedule a notification task for the reply
382 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
383 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
384 actorContext, shardingService, configuration.getPrefix()),
385 actorSystem.dispatcher());
388 private void onPrefixShardCreated(final PrefixShardCreated message) {
389 LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
391 final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
392 final ActorRef sender = getSender();
394 final List<CompletableFuture<Object>> futures = new ArrayList<>();
396 for (final String address : addresses) {
397 final ActorSelection actorSelection = actorSystem.actorSelection(address);
398 futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
399 new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
402 final CompletableFuture<Void> combinedFuture =
403 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
405 combinedFuture.thenRun(() -> {
406 sender.tell(new Status.Success(null), self());
407 }).exceptionally(throwable -> {
408 sender.tell(new Status.Failure(throwable), self());
413 private void onRemovePrefixShard(final RemovePrefixShard message) {
414 LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
416 //TODO the removal message should have the configuration or some other way to get to the key
417 final Update<ORMap<PrefixShardConfiguration>> removal =
418 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
419 map -> map.remove(cluster, "prefix=" + message.getPrefix()));
420 replicator.tell(removal, self());
422 final ShardRemovalLookupTask removalTask =
423 new ShardRemovalLookupTask(actorSystem, getSender(),
424 actorContext, message.getPrefix());
426 actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
429 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
430 LOG.debug("Received PrefixShardRemoved: {}", message);
432 final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
434 if (registration == null) {
435 LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
436 message.getPrefix(), idToShardRegistration);
440 registration.close();
443 private static MemberName memberToName(final Member member) {
444 return MemberName.forName(member.roles().iterator().next());
447 private class ActorProducerRegistration {
449 private final DOMDataTreeProducer producer;
450 private final Collection<DOMDataTreeIdentifier> subtrees;
452 ActorProducerRegistration(final DOMDataTreeProducer producer,
453 final Collection<DOMDataTreeIdentifier> subtrees) {
454 this.producer = producer;
455 this.subtrees = subtrees;
458 void close() throws DOMDataTreeProducerException {
460 subtrees.forEach(idToProducer::remove);
464 private static class ShardFrontendRegistration extends
465 AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
467 private final ActorRef clientActor;
468 private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
470 ShardFrontendRegistration(final ActorRef clientActor,
471 final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
472 super(shardRegistration);
473 this.clientActor = clientActor;
474 this.shardRegistration = shardRegistration;
478 protected void removeRegistration() {
479 shardRegistration.close();
480 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
484 private abstract static class LookupTask implements Runnable {
486 private final ActorRef replyTo;
487 private int retries = 0;
489 private LookupTask(final ActorRef replyTo) {
490 this.replyTo = replyTo;
493 abstract void reschedule(int retries);
495 void tryReschedule(@Nullable final Throwable throwable) {
496 if (retries <= LOOKUP_TASK_MAX_RETRIES) {
504 void fail(@Nullable final Throwable throwable) {
505 if (throwable == null) {
506 replyTo.tell(new Failure(
507 new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
508 + "Failing..")), noSender());
510 replyTo.tell(new Failure(
511 new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
512 + "Failing..", throwable)), noSender());
518 * Handles the lookup step of cds shard creation once the configuration is updated.
520 private static class ShardCreationLookupTask extends LookupTask {
522 private final ActorSystem system;
523 private final ActorRef replyTo;
524 private final ClusterWrapper clusterWrapper;
525 private final ActorContext context;
526 private final DistributedShardedDOMDataTree shardingService;
527 private final DOMDataTreeIdentifier toLookup;
529 ShardCreationLookupTask(final ActorSystem system,
530 final ActorRef replyTo,
531 final ClusterWrapper clusterWrapper,
532 final ActorContext context,
533 final DistributedShardedDOMDataTree shardingService,
534 final DOMDataTreeIdentifier toLookup) {
536 this.system = system;
537 this.replyTo = replyTo;
538 this.clusterWrapper = clusterWrapper;
539 this.context = context;
540 this.shardingService = shardingService;
541 this.toLookup = toLookup;
546 final Future<ActorRef> localShardFuture =
547 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
549 localShardFuture.onComplete(new OnComplete<ActorRef>() {
551 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
552 if (throwable != null) {
553 tryReschedule(throwable);
555 LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
557 system.scheduler().scheduleOnce(
558 SHARD_LOOKUP_TASK_INTERVAL,
559 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
560 shardingService, toLookup),
561 system.dispatcher());
564 }, system.dispatcher());
568 void reschedule(int retries) {
569 LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
570 system.scheduler().scheduleOnce(
571 SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
576 * Handles the readiness step by waiting for a leader of the created shard.
578 private static class ShardLeaderLookupTask extends LookupTask {
580 private final ActorSystem system;
581 private final ActorRef replyTo;
582 private final ActorContext context;
583 private final ClusterWrapper clusterWrapper;
584 private final ActorRef shard;
585 private final DistributedShardedDOMDataTree shardingService;
586 private final DOMDataTreeIdentifier toLookup;
588 ShardLeaderLookupTask(final ActorSystem system,
589 final ActorRef replyTo,
590 final ActorContext context,
591 final ClusterWrapper clusterWrapper,
592 final ActorRef shard,
593 final DistributedShardedDOMDataTree shardingService,
594 final DOMDataTreeIdentifier toLookup) {
596 this.system = system;
597 this.replyTo = replyTo;
598 this.context = context;
599 this.clusterWrapper = clusterWrapper;
601 this.shardingService = shardingService;
602 this.toLookup = toLookup;
608 final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
610 ask.onComplete(new OnComplete<Object>() {
612 public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
613 if (throwable != null) {
614 tryReschedule(throwable);
616 final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
617 final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
618 if (leaderActor.isPresent()) {
619 // leader is found, backend seems ready, check if the frontend is ready
620 LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
621 clusterWrapper.getCurrentMemberName(), toLookup);
622 system.scheduler().scheduleOnce(
623 SHARD_LOOKUP_TASK_INTERVAL,
624 new FrontendLookupTask(system, replyTo, shardingService, toLookup),
625 system.dispatcher());
631 }, system.dispatcher());
636 void reschedule(int retries) {
637 LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
638 clusterWrapper.getCurrentMemberName(), toLookup, retries);
639 system.scheduler().scheduleOnce(
640 SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
645 * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
646 * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
647 * case they race), the future for the cds shard creation is completed and the shard is ready for use.
649 private static final class FrontendLookupTask extends LookupTask {
651 private final ActorSystem system;
652 private final ActorRef replyTo;
653 private final DistributedShardedDOMDataTree shardingService;
654 private final DOMDataTreeIdentifier toLookup;
656 FrontendLookupTask(final ActorSystem system,
657 final ActorRef replyTo,
658 final DistributedShardedDOMDataTree shardingService,
659 final DOMDataTreeIdentifier toLookup) {
661 this.system = system;
662 this.replyTo = replyTo;
663 this.shardingService = shardingService;
664 this.toLookup = toLookup;
669 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
670 shardingService.lookupShardFrontend(toLookup);
672 if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
673 replyTo.tell(new Success(null), noSender());
679 private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
680 final DOMDataTreeIdentifier prefix) {
685 if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) {
689 if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
697 void reschedule(int retries) {
698 LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
699 system.scheduler().scheduleOnce(
700 SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
705 * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
708 private static class ShardRemovalLookupTask extends LookupTask {
710 private final ActorSystem system;
711 private final ActorRef replyTo;
712 private final ActorContext context;
713 private final DOMDataTreeIdentifier toLookup;
715 ShardRemovalLookupTask(final ActorSystem system,
716 final ActorRef replyTo,
717 final ActorContext context,
718 final DOMDataTreeIdentifier toLookup) {
720 this.system = system;
721 this.replyTo = replyTo;
722 this.context = context;
723 this.toLookup = toLookup;
728 final Future<ActorRef> localShardFuture =
729 context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
731 localShardFuture.onComplete(new OnComplete<ActorRef>() {
733 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
734 if (throwable != null) {
735 //TODO Shouldn't we check why findLocalShard failed?
736 LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
738 replyTo.tell(new Success(null), noSender());
743 }, system.dispatcher());
747 void reschedule(int retries) {
748 LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
750 system.scheduler().scheduleOnce(
751 SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
755 public static class ShardedDataTreeActorCreator {
757 private DistributedShardedDOMDataTree shardingService;
758 private DistributedDataStore distributedConfigDatastore;
759 private DistributedDataStore distributedOperDatastore;
760 private ActorSystem actorSystem;
761 private ClusterWrapper cluster;
763 public DistributedShardedDOMDataTree getShardingService() {
764 return shardingService;
767 public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
768 this.shardingService = shardingService;
772 public ActorSystem getActorSystem() {
776 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
777 this.actorSystem = actorSystem;
781 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
782 this.cluster = cluster;
786 public ClusterWrapper getClusterWrapper() {
790 public DistributedDataStore getDistributedConfigDatastore() {
791 return distributedConfigDatastore;
794 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
795 final DistributedDataStore distributedConfigDatastore) {
796 this.distributedConfigDatastore = distributedConfigDatastore;
800 public DistributedDataStore getDistributedOperDatastore() {
801 return distributedOperDatastore;
804 public ShardedDataTreeActorCreator setDistributedOperDatastore(
805 final DistributedDataStore distributedOperDatastore) {
806 this.distributedOperDatastore = distributedOperDatastore;
810 private void verify() {
811 Preconditions.checkNotNull(shardingService);
812 Preconditions.checkNotNull(actorSystem);
813 Preconditions.checkNotNull(cluster);
814 Preconditions.checkNotNull(distributedConfigDatastore);
815 Preconditions.checkNotNull(distributedOperDatastore);
818 public Props props() {
820 return Props.create(ShardedDataTreeActor.class, this);