X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardedDataTreeActor.java;h=52c3d25faa18ef1de438d06e0c468f05998bf1a0;hp=c1a099b97c49a1d61af6091929b19c0d4deefc84;hb=a35607c5040d0fd561529fde3032c9f49393deeb;hpb=877c428f2897f6e3b11efd25589a84aa0c660a31 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index c1a099b97c..52c3d25faa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -5,10 +5,9 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.sharding; -import static akka.actor.ActorRef.noSender; +import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -16,6 +15,7 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; +import akka.actor.Status.Success; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberExited; import akka.cluster.ClusterEvent.MemberRemoved; @@ -24,92 +24,107 @@ import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.ClusterEvent.ReachableMember; import akka.cluster.ClusterEvent.UnreachableMember; import akka.cluster.Member; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; import akka.util.Timeout; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; -import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeService; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; +import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration; +import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry; import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.compat.java8.FutureConverters; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote * nodes of newly open producers/shards on the local node. */ +@Deprecated(forRemoval = true) public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { + private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class); + private static final String PERSISTENCE_ID = "sharding-service-actor"; private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); - private final DOMDataTreeService dataTreeService; - private final DOMDataTreeShardingService shardingService; + static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS); + + private final DistributedShardedDOMDataTree shardingService; private final ActorSystem actorSystem; - private final ClusterWrapper cluster; + private final ClusterWrapper clusterWrapper; // helper actorContext used only for static calls to executeAsync etc // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final ShardingServiceAddressResolver resolver; - private final DistributedDataStore distributedConfigDatastore; - private final DistributedDataStore distributedOperDatastore; + private final DistributedDataStoreInterface distributedConfigDatastore; + private final DistributedDataStoreInterface distributedOperDatastore; + private final int lookupTaskMaxRetries; private final Map idToProducer = new HashMap<>(); - private final Map idToShardRegistration = new HashMap<>(); ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName()); - dataTreeService = builder.getDataTreeService(); shardingService = builder.getShardingService(); actorSystem = builder.getActorSystem(); - cluster = builder.getClusterWrapper(); + clusterWrapper = builder.getClusterWrapper(); distributedConfigDatastore = builder.getDistributedConfigDatastore(); distributedOperDatastore = builder.getDistributedOperDatastore(); - actorContext = distributedConfigDatastore.getActorContext(); + lookupTaskMaxRetries = builder.getLookupTaskMaxRetries(); + actorUtils = distributedConfigDatastore.getActorUtils(); resolver = new ShardingServiceAddressResolver( - DistributedShardedDOMDataTree.ACTOR_ID, cluster.getCurrentMemberName()); + DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); - cluster.subscribeToMemberEvents(self()); + clusterWrapper.subscribeToMemberEvents(self()); } @Override - protected void handleRecover(final Object message) throws Exception { + public void preStart() { + } + + @Override + protected void handleRecover(final Object message) { LOG.debug("Received a recover message {}", message); } @Override - protected void handleCommand(final Object message) throws Exception { + protected void handleCommand(final Object message) { + LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message); if (message instanceof ClusterEvent.MemberUp) { memberUp((ClusterEvent.MemberUp) message); } else if (message instanceof ClusterEvent.MemberWeaklyUp) { @@ -132,12 +147,14 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { onNotifyProducerRemoved((NotifyProducerRemoved) message); } else if (message instanceof PrefixShardCreated) { onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof CreatePrefixShard) { - onCreatePrefixShard((CreatePrefixShard) message); - } else if (message instanceof RemovePrefixShard) { - onRemovePrefixShard((RemovePrefixShard) message); + } else if (message instanceof LookupPrefixShard) { + onLookupPrefixShard((LookupPrefixShard) message); + } else if (message instanceof PrefixShardRemovalLookup) { + onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message); } else if (message instanceof PrefixShardRemoved) { onPrefixShardRemoved((PrefixShardRemoved) message); + } else if (message instanceof StartConfigShardLookup) { + onStartConfigShardLookup((StartConfigShardLookup) message); } } @@ -198,6 +215,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onProducerCreated(final ProducerCreated message) { LOG.debug("Received ProducerCreated: {}", message); + + // fastpath if we have no peers + if (resolver.getShardingServicePeerActorAddresses().isEmpty()) { + getSender().tell(new Status.Success(null), ActorRef.noSender()); + } + final ActorRef sender = getSender(); final Collection subtrees = message.getSubtrees(); @@ -207,7 +230,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorSelection actorSelection = actorSystem.actorSelection(address); futures.add( FutureConverters.toJava( - actorContext.executeOperationAsync( + actorUtils.executeOperationAsync( actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT)) .toCompletableFuture()); } @@ -215,24 +238,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final CompletableFuture combinedFuture = CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()])); - combinedFuture.thenRun(() -> { - for (final CompletableFuture future : futures) { - try { - final Object result = future.get(); - if (result instanceof Status.Failure) { - sender.tell(result, self()); - return; - } - } catch (InterruptedException | ExecutionException e) { - sender.tell(new Status.Failure(e), self()); - return; - } - } - sender.tell(new Status.Success(null), noSender()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + combinedFuture + .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender())) + .exceptionally(throwable -> { + sender.tell(new Status.Failure(throwable), self()); + return null; + }); } private void onNotifyProducerCreated(final NotifyProducerCreated message) { @@ -242,7 +253,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { try { final ActorProducerRegistration registration = - new ActorProducerRegistration(dataTreeService.createProducer(subtrees), subtrees); + new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees); subtrees.forEach(id -> idToProducer.put(id, registration)); sender().tell(new Status.Success(null), self()); } catch (final IllegalArgumentException e) { @@ -259,7 +270,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorSelection selection = actorSystem.actorSelection(address); futures.add(FutureConverters.toJava( - actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees()))) + actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees()))) .toCompletableFuture()); } @@ -283,130 +294,70 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next()); if (registration == null) { LOG.warn("The notification contained a path on which no producer is registered, throwing away"); - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); return; } try { registration.close(); - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); } catch (final DOMDataTreeProducerException e) { LOG.error("Unable to close producer", e); - getSender().tell(new Status.Failure(e), noSender()); + getSender().tell(new Status.Failure(e), ActorRef.noSender()); } } @SuppressWarnings("checkstyle:IllegalCatch") - private void onCreatePrefixShard(final CreatePrefixShard message) { - LOG.debug("Received CreatePrefixShard: {}", message); + private void onLookupPrefixShard(final LookupPrefixShard message) { + LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - final PrefixShardConfiguration configuration = message.getConfiguration(); + final DOMDataTreeIdentifier prefix = message.getPrefix(); - final DOMDataTreeProducer producer = - dataTreeService.createProducer(Collections.singleton(configuration.getPrefix())); + final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION + ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils(); - final DistributedDataStore distributedDataStore = - configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION - ? distributedConfigDatastore : distributedOperDatastore; - final String shardName = ClusterUtils.getCleanShardName(configuration.getPrefix().getRootIdentifier()); - LOG.debug("Creating distributed datastore client for shard {}", shardName); - final Props distributedDataStoreClientProps = - SimpleDataStoreClientActor.props(cluster.getCurrentMemberName(), - "Shard-" + shardName, distributedDataStore.getActorContext(), shardName); - - final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps); - final DataStoreClient client; - try { - client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); - } catch (final Exception e) { - LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e); - clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - throw Throwables.propagate(e); - } - - try { - final ListenerRegistration shardFrontendRegistration = - shardingService.registerDataTreeShard(configuration.getPrefix(), - new ShardFrontend( - client, - configuration.getPrefix() - ), - producer); - idToShardRegistration.put(configuration.getPrefix(), - new ShardFrontendRegistration(clientActor, shardFrontendRegistration)); - - sender().tell(new Status.Success(null), self()); - } catch (final DOMDataTreeShardingConflictException e) { - LOG.error("Unable to create shard", e); - clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - sender().tell(new Status.Failure(e), self()); - } finally { - try { - producer.close(); - } catch (final DOMDataTreeProducerException e) { - LOG.error("Unable to close producer that was used for shard registration {}", producer, e); - } - } + // schedule a notification task for the reply + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, + new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, + utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); } private void onPrefixShardCreated(final PrefixShardCreated message) { - LOG.debug("Received PrefixShardCreated: {}", message); - - final Collection addresses = resolver.getShardingServicePeerActorAddresses(); - final ActorRef sender = getSender(); - - final List> futures = new ArrayList<>(); - - for (final String address : addresses) { - final ActorSelection actorSelection = actorSystem.actorSelection(address); - futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection, - new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture()); - } + LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message); - final CompletableFuture combinedFuture = - CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); + final PrefixShardConfiguration config = message.getConfiguration(); - combinedFuture.thenRun(() -> { - for (final CompletableFuture future : futures) { - try { - final Object result = future.get(); - if (result instanceof Status.Failure) { - sender.tell(result, self()); - return; - } - } catch (InterruptedException | ExecutionException e) { - sender.tell(new Status.Failure(e), self()); - return; - } - } - sender.tell(new Status.Success(null), self()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix())); } - private void onRemovePrefixShard(final RemovePrefixShard message) { - LOG.debug("Received RemovePrefixShard: {}", message); + private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) { + LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message); - for (final String address : resolver.getShardingServicePeerActorAddresses()) { - final ActorSelection selection = actorContext.actorSelection(address); - selection.tell(new PrefixShardRemoved(message.getPrefix()), getSelf()); - } + final ShardRemovalLookupTask removalTask = + new ShardRemovalLookupTask(actorSystem, getSender(), + actorUtils, message.getPrefix(), lookupTaskMaxRetries); + + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); } private void onPrefixShardRemoved(final PrefixShardRemoved message) { LOG.debug("Received PrefixShardRemoved: {}", message); - final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix()); + shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix())); + } - if (registration == null) { - LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}", - message.getPrefix(), idToShardRegistration); - return; - } + private void onStartConfigShardLookup(final StartConfigShardLookup message) { + LOG.debug("Received StartConfigShardLookup: {}", message); - registration.close(); + final ActorUtils context = + message.getType().equals(LogicalDatastoreType.CONFIGURATION) + ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils(); + + // schedule a notification task for the reply + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, + new ConfigShardLookupTask( + actorSystem, getSender(), context, message, lookupTaskMaxRetries), + actorSystem.dispatcher()); } private static MemberName memberToName(final Member member) { @@ -431,13 +382,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } private static class ShardFrontendRegistration extends - AbstractObjectRegistration> { + AbstractObjectRegistration> { private final ActorRef clientActor; - private final ListenerRegistration shardRegistration; + private final ListenerRegistration shardRegistration; ShardFrontendRegistration(final ActorRef clientActor, - final ListenerRegistration shardRegistration) { + final ListenerRegistration shardRegistration) { super(shardRegistration); this.clientActor = clientActor; this.shardRegistration = shardRegistration; @@ -450,29 +401,367 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } - public static class ShardedDataTreeActorCreator { + /** + * Handles the lookup step of cds shard creation once the configuration is updated. + */ + private static class ShardCreationLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ClusterWrapper clusterWrapper; + private final ActorUtils context; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; + + ShardCreationLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ClusterWrapper clusterWrapper, + final ActorUtils context, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.clusterWrapper = clusterWrapper; + this.context = context; + this.shardingService = shardingService; + this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; + } - private DOMDataTreeService dataTreeService; - private DOMDataTreeShardingService shardingService; - private DistributedDataStore distributedConfigDatastore; - private DistributedDataStore distributedOperDatastore; - private ActorSystem actorSystem; - private ClusterWrapper cluster; + @Override + public void run() { + final Future localShardFuture = + context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); + + localShardFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final ActorRef actorRef) { + if (throwable != null) { + tryReschedule(throwable); + } else { + LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup); + + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef, + shardingService, toLookup, lookupMaxRetries), + system.dispatcher()); + } + } + }, system.dispatcher()); + } - public DOMDataTreeService getDataTreeService() { - return dataTreeService; + @Override + void reschedule(final int retries) { + LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher()); } + } - public ShardedDataTreeActorCreator setDataTreeService(final DOMDataTreeService dataTreeService) { - this.dataTreeService = dataTreeService; - return this; + /** + * Handles the readiness step by waiting for a leader of the created shard. + */ + private static class ShardLeaderLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorUtils context; + private final ClusterWrapper clusterWrapper; + private final ActorRef shard; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; + + ShardLeaderLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorUtils context, + final ClusterWrapper clusterWrapper, + final ActorRef shard, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.shard = shard; + this.shardingService = shardingService; + this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; } - public DOMDataTreeShardingService getShardingService() { + @Override + public void run() { + + final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); + + ask.onComplete(new OnComplete<>() { + @Override + public void onComplete(final Throwable throwable, final Object findLeaderReply) { + if (throwable != null) { + tryReschedule(throwable); + } else { + final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; + final Optional leaderActor = findLeader.getLeaderActor(); + if (leaderActor.isPresent()) { + // leader is found, backend seems ready, check if the frontend is ready + LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..", + clusterWrapper.getCurrentMemberName(), toLookup); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new FrontendLookupTask( + system, replyTo, shardingService, toLookup, lookupMaxRetries), + system.dispatcher()); + } else { + tryReschedule(null); + } + } + } + }, system.dispatcher()); + + } + + @Override + void reschedule(final int retries) { + LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..", + clusterWrapper.getCurrentMemberName(), toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher()); + } + } + + /** + * After backend is ready this handles the last step - checking if we have a frontend shard for the backend, + * once this completes(which should be ready by the time the backend is created, this is just a sanity check in + * case they race), the future for the cds shard creation is completed and the shard is ready for use. + */ + private static final class FrontendLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + + FrontendLookupTask(final ActorSystem system, + final ActorRef replyTo, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.shardingService = shardingService; + this.toLookup = toLookup; + } + + @Override + public void run() { + final DOMDataTreePrefixTableEntry> entry = + shardingService.lookupShardFrontend(toLookup); + + if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) { + replyTo.tell(new Success(null), ActorRef.noSender()); + } else { + tryReschedule(null); + } + } + + private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry entry, + final DOMDataTreeIdentifier prefix) { + if (entry == null) { + return false; + } + + if (YangInstanceIdentifier.empty().equals(prefix.getRootIdentifier())) { + return true; + } + + if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) { + return true; + } + + return false; + } + + @Override + void reschedule(final int retries) { + LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher()); + } + } + + /** + * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the + * configuration. + */ + private static class ShardRemovalLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorUtils context; + private final DOMDataTreeIdentifier toLookup; + + ShardRemovalLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorUtils context, + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.toLookup = toLookup; + } + + @Override + public void run() { + final Future localShardFuture = + context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); + + localShardFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final ActorRef actorRef) { + if (throwable != null) { + //TODO Shouldn't we check why findLocalShard failed? + LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future", + toLookup); + replyTo.tell(new Success(null), ActorRef.noSender()); + } else { + tryReschedule(null); + } + } + }, system.dispatcher()); + } + + @Override + void reschedule(final int retries) { + LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..", + toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher()); + } + } + + /** + * Task for handling the lookup of the backend for the configuration shard. + */ + private static class ConfigShardLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorUtils context; + + ConfigShardLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorUtils context, + final StartConfigShardLookup message, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + } + + @Override + void reschedule(final int retries) { + LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Optional localShard = + context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID); + + if (!localShard.isPresent()) { + tryReschedule(null); + } else { + LOG.debug("Local backend for prefix configuration shard lookup successful"); + replyTo.tell(new Status.Success(null), ActorRef.noSender()); + } + } + } + + /** + * Task for handling the readiness state of the config shard. Reports success once the leader is elected. + */ + private static class ConfigShardReadinessTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorUtils context; + private final ClusterWrapper clusterWrapper; + private final ActorRef shard; + + ConfigShardReadinessTask(final ActorSystem system, + final ActorRef replyTo, + final ActorUtils context, + final ClusterWrapper clusterWrapper, + final ActorRef shard, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.shard = shard; + } + + @Override + void reschedule(final int retries) { + LOG.debug("{} - Leader for config shard not found on try: {}, retrying..", + clusterWrapper.getCurrentMemberName(), retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); + + ask.onComplete(new OnComplete<>() { + @Override + public void onComplete(final Throwable throwable, final Object findLeaderReply) { + if (throwable != null) { + tryReschedule(throwable); + } else { + final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; + final Optional leaderActor = findLeader.getLeaderActor(); + if (leaderActor.isPresent()) { + // leader is found, backend seems ready, check if the frontend is ready + LOG.debug("{} - Leader for config shard is ready. Ending lookup.", + clusterWrapper.getCurrentMemberName()); + replyTo.tell(new Status.Success(null), ActorRef.noSender()); + } else { + tryReschedule(null); + } + } + } + }, system.dispatcher()); + } + } + + public static class ShardedDataTreeActorCreator { + + private DistributedShardedDOMDataTree shardingService; + private DistributedDataStoreInterface distributedConfigDatastore; + private DistributedDataStoreInterface distributedOperDatastore; + private ActorSystem actorSystem; + private ClusterWrapper cluster; + private int maxRetries; + + public DistributedShardedDOMDataTree getShardingService() { return shardingService; } - public ShardedDataTreeActorCreator setShardingService(final DOMDataTreeShardingService shardingService) { + public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) { this.shardingService = shardingService; return this; } @@ -486,8 +775,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return this; } - public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) { - this.cluster = cluster; + public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) { + this.cluster = clusterWrapper; return this; } @@ -495,39 +784,46 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return cluster; } - public DistributedDataStore getDistributedConfigDatastore() { + public DistributedDataStoreInterface getDistributedConfigDatastore() { return distributedConfigDatastore; } public ShardedDataTreeActorCreator setDistributedConfigDatastore( - final DistributedDataStore distributedConfigDatastore) { + final DistributedDataStoreInterface distributedConfigDatastore) { this.distributedConfigDatastore = distributedConfigDatastore; return this; } - public DistributedDataStore getDistributedOperDatastore() { + public DistributedDataStoreInterface getDistributedOperDatastore() { return distributedOperDatastore; } public ShardedDataTreeActorCreator setDistributedOperDatastore( - final DistributedDataStore distributedOperDatastore) { + final DistributedDataStoreInterface distributedOperDatastore) { this.distributedOperDatastore = distributedOperDatastore; return this; } + public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) { + this.maxRetries = newMaxRetries; + return this; + } + + public int getLookupTaskMaxRetries() { + return maxRetries; + } + private void verify() { - Preconditions.checkNotNull(dataTreeService); - Preconditions.checkNotNull(shardingService); - Preconditions.checkNotNull(actorSystem); - Preconditions.checkNotNull(cluster); - Preconditions.checkNotNull(distributedConfigDatastore); - Preconditions.checkNotNull(distributedOperDatastore); + requireNonNull(shardingService); + requireNonNull(actorSystem); + requireNonNull(cluster); + requireNonNull(distributedConfigDatastore); + requireNonNull(distributedOperDatastore); } public Props props() { verify(); return Props.create(ShardedDataTreeActor.class, this); } - } }