X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardedDataTreeActor.java;h=224ca0b3d0b19ad41209d419351c1faa2d080940;hb=b712eb01354ddb5878008e2a2e8f03fb19b92555;hp=3c1ae1069e3a656765ba834236fdbf564c3016a1;hpb=c1336f9b497bc6867536a24f629c3f0b002ccb2f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index 3c1ae1069e..224ca0b3d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -16,6 +16,7 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; +import akka.actor.Status.Success; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberExited; @@ -25,46 +26,52 @@ import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.ClusterEvent.ReachableMember; import akka.cluster.ClusterEvent.UnreachableMember; import akka.cluster.Member; -import akka.cluster.ddata.DistributedData; -import akka.cluster.ddata.ORMap; -import akka.cluster.ddata.Replicator; -import akka.cluster.ddata.Replicator.Changed; -import akka.cluster.ddata.Replicator.Subscribe; -import akka.cluster.ddata.Replicator.Update; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.google.common.collect.Sets.SetView; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; -import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; -import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; +import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration; +import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry; import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.compat.java8.FutureConverters; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote @@ -72,9 +79,13 @@ import scala.compat.java8.FutureConverters; */ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { + private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class); + private static final String PERSISTENCE_ID = "sharding-service-actor"; private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); + static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS); + private final DistributedShardedDOMDataTree shardingService; private final ActorSystem actorSystem; private final ClusterWrapper clusterWrapper; @@ -82,16 +93,15 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore private final ActorContext actorContext; private final ShardingServiceAddressResolver resolver; - private final DistributedDataStore distributedConfigDatastore; - private final DistributedDataStore distributedOperDatastore; + private final AbstractDataStore distributedConfigDatastore; + private final AbstractDataStore distributedOperDatastore; + private final int lookupTaskMaxRetries; private final Map idToProducer = new HashMap<>(); private final Map idToShardRegistration = new HashMap<>(); private final Cluster cluster; - private final ActorRef replicator; - private ORMap currentData = ORMap.create(); private Map currentConfiguration = new HashMap<>(); ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { @@ -102,21 +112,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { clusterWrapper = builder.getClusterWrapper(); distributedConfigDatastore = builder.getDistributedConfigDatastore(); distributedOperDatastore = builder.getDistributedOperDatastore(); + lookupTaskMaxRetries = builder.getLookupTaskMaxRetries(); actorContext = distributedConfigDatastore.getActorContext(); resolver = new ShardingServiceAddressResolver( DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); clusterWrapper.subscribeToMemberEvents(self()); cluster = Cluster.get(actorSystem); - - replicator = DistributedData.get(context().system()).replicator(); } @Override public void preStart() { - final Subscribe> subscribe = - new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self()); - replicator.tell(subscribe, noSender()); } @Override @@ -126,7 +132,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { @Override protected void handleCommand(final Object message) throws Exception { - LOG.debug("Received {}", message); + LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message); if (message instanceof ClusterEvent.MemberUp) { memberUp((ClusterEvent.MemberUp) message); } else if (message instanceof ClusterEvent.MemberWeaklyUp) { @@ -139,8 +145,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { memberUnreachable((ClusterEvent.UnreachableMember) message); } else if (message instanceof ClusterEvent.ReachableMember) { memberReachable((ClusterEvent.ReachableMember) message); - } else if (message instanceof Changed) { - onConfigChanged((Changed) message); } else if (message instanceof ProducerCreated) { onProducerCreated((ProducerCreated) message); } else if (message instanceof NotifyProducerCreated) { @@ -151,51 +155,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { onNotifyProducerRemoved((NotifyProducerRemoved) message); } else if (message instanceof PrefixShardCreated) { onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof CreatePrefixShard) { - onCreatePrefixShard((CreatePrefixShard) message); - } else if (message instanceof RemovePrefixShard) { - onRemovePrefixShard((RemovePrefixShard) message); + } else if (message instanceof LookupPrefixShard) { + onLookupPrefixShard((LookupPrefixShard) message); + } else if (message instanceof PrefixShardRemovalLookup) { + onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message); } else if (message instanceof PrefixShardRemoved) { onPrefixShardRemoved((PrefixShardRemoved) message); + } else if (message instanceof StartConfigShardLookup) { + onStartConfigShardLookup((StartConfigShardLookup) message); } } - private void onConfigChanged(final Changed> change) { - LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change); - - currentData = change.dataValue(); - final Map changedConfig = change.dataValue().getEntries(); - - LOG.debug("Changed set {}", changedConfig); - - try { - final Map newConfig = - changedConfig.values().stream().collect( - Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity())); - resolveConfig(newConfig); - } catch (final IllegalStateException e) { - LOG.error("Failed, ", e); - } - - } - - private void resolveConfig(final Map newConfig) { - - // get the removed configurations - final SetView deleted = - Sets.difference(currentConfiguration.keySet(), newConfig.keySet()); - shardingService.resolveShardRemovals(deleted); - - // get the added configurations - final SetView additions = - Sets.difference(newConfig.keySet(), currentConfiguration.keySet()); - shardingService.resolveShardAdditions(additions); - // we can ignore those that existed previously since the potential changes in replicas will be handled by - // shard manager. - - currentConfiguration = new HashMap<>(newConfig); - } - @Override public String persistenceId() { return PERSISTENCE_ID; @@ -254,8 +224,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onProducerCreated(final ProducerCreated message) { LOG.debug("Received ProducerCreated: {}", message); - // fastpath if no replication is needed, since there is only one node - if (resolver.getShardingServicePeerActorAddresses().size() == 1) { + // fastpath if we have no peers + if (resolver.getShardingServicePeerActorAddresses().isEmpty()) { getSender().tell(new Status.Success(null), noSender()); } @@ -276,12 +246,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final CompletableFuture combinedFuture = CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()])); - combinedFuture.thenRun(() -> { - sender.tell(new Status.Success(null), noSender()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + combinedFuture + .thenRun(() -> sender.tell(new Success(null), noSender())) + .exceptionally(throwable -> { + sender.tell(new Status.Failure(throwable), self()); + return null; + }); } private void onNotifyProducerCreated(final NotifyProducerCreated message) { @@ -346,65 +316,56 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @SuppressWarnings("checkstyle:IllegalCatch") - private void onCreatePrefixShard(final CreatePrefixShard message) { - LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); + private void onLookupPrefixShard(final LookupPrefixShard message) { + LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - final PrefixShardConfiguration configuration = message.getConfiguration(); + final DOMDataTreeIdentifier prefix = message.getPrefix(); - final Update> update = - new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), - map -> map.put(cluster, configuration.toDataMapKey(), configuration)); + final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION + ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); - replicator.tell(update, self()); + // schedule a notification task for the reply + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, + new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, + context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); } private void onPrefixShardCreated(final PrefixShardCreated message) { LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message); - final Collection addresses = resolver.getShardingServicePeerActorAddresses(); - final ActorRef sender = getSender(); - - final List> futures = new ArrayList<>(); - - for (final String address : addresses) { - final ActorSelection actorSelection = actorSystem.actorSelection(address); - futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection, - new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture()); - } - - final CompletableFuture combinedFuture = - CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); + final PrefixShardConfiguration config = message.getConfiguration(); - combinedFuture.thenRun(() -> { - sender.tell(new Status.Success(null), self()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix())); } - private void onRemovePrefixShard(final RemovePrefixShard message) { - LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); + private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) { + LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message); - //TODO the removal message should have the configuration or some other way to get to the key - final Update> removal = - new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), - map -> map.remove(cluster, "prefix=" + message.getPrefix())); - replicator.tell(removal, self()); + final ShardRemovalLookupTask removalTask = + new ShardRemovalLookupTask(actorSystem, getSender(), + actorContext, message.getPrefix(), lookupTaskMaxRetries); + + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); } private void onPrefixShardRemoved(final PrefixShardRemoved message) { LOG.debug("Received PrefixShardRemoved: {}", message); - final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix()); + shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix())); + } - if (registration == null) { - LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}", - message.getPrefix(), idToShardRegistration); - return; - } + private void onStartConfigShardLookup(final StartConfigShardLookup message) { + LOG.debug("Received StartConfigShardLookup: {}", message); + + final ActorContext context = + message.getType().equals(LogicalDatastoreType.CONFIGURATION) + ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); - registration.close(); + // schedule a notification task for the reply + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, + new ConfigShardLookupTask( + actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries), + actorSystem.dispatcher()); } private static MemberName memberToName(final Member member) { @@ -448,13 +409,370 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } + /** + * Handles the lookup step of cds shard creation once the configuration is updated. + */ + private static class ShardCreationLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ClusterWrapper clusterWrapper; + private final ActorContext context; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; + + ShardCreationLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ClusterWrapper clusterWrapper, + final ActorContext context, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.clusterWrapper = clusterWrapper; + this.context = context; + this.shardingService = shardingService; + this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; + } + + @Override + public void run() { + final Future localShardFuture = + context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); + + localShardFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup); + + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef, + shardingService, toLookup, lookupMaxRetries), + system.dispatcher()); + } + } + }, system.dispatcher()); + } + + @Override + void reschedule(int retries) { + LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher()); + } + } + + /** + * Handles the readiness step by waiting for a leader of the created shard. + */ + private static class ShardLeaderLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final ActorRef shard; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; + + ShardLeaderLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final ActorRef shard, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.shard = shard; + this.shardingService = shardingService; + this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; + } + + @Override + public void run() { + + final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); + + ask.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; + final java.util.Optional leaderActor = findLeader.getLeaderActor(); + if (leaderActor.isPresent()) { + // leader is found, backend seems ready, check if the frontend is ready + LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..", + clusterWrapper.getCurrentMemberName(), toLookup); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new FrontendLookupTask( + system, replyTo, shardingService, toLookup, lookupMaxRetries), + system.dispatcher()); + } else { + tryReschedule(null); + } + } + } + }, system.dispatcher()); + + } + + @Override + void reschedule(int retries) { + LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..", + clusterWrapper.getCurrentMemberName(), toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher()); + } + } + + /** + * After backend is ready this handles the last step - checking if we have a frontend shard for the backend, + * once this completes(which should be ready by the time the backend is created, this is just a sanity check in + * case they race), the future for the cds shard creation is completed and the shard is ready for use. + */ + private static final class FrontendLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + + FrontendLookupTask(final ActorSystem system, + final ActorRef replyTo, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.shardingService = shardingService; + this.toLookup = toLookup; + } + + @Override + public void run() { + final DOMDataTreePrefixTableEntry> entry = + shardingService.lookupShardFrontend(toLookup); + + if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) { + replyTo.tell(new Success(null), noSender()); + } else { + tryReschedule(null); + } + } + + private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry entry, + final DOMDataTreeIdentifier prefix) { + if (entry == null) { + return false; + } + + if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) { + return true; + } + + if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) { + return true; + } + + return false; + } + + @Override + void reschedule(int retries) { + LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher()); + } + } + + /** + * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the + * configuration. + */ + private static class ShardRemovalLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final DOMDataTreeIdentifier toLookup; + + ShardRemovalLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.toLookup = toLookup; + } + + @Override + public void run() { + final Future localShardFuture = + context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); + + localShardFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + if (throwable != null) { + //TODO Shouldn't we check why findLocalShard failed? + LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future", + toLookup); + replyTo.tell(new Success(null), noSender()); + } else { + tryReschedule(null); + } + } + }, system.dispatcher()); + } + + @Override + void reschedule(int retries) { + LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..", + toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher()); + } + } + + /** + * Task for handling the lookup of the backend for the configuration shard. + */ + private static class ConfigShardLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final int lookupTaskMaxRetries; + + ConfigShardLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final StartConfigShardLookup message, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.lookupTaskMaxRetries = lookupMaxRetries; + } + + @Override + void reschedule(int retries) { + LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Optional localShard = + context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID); + + if (!localShard.isPresent()) { + tryReschedule(null); + } else { + LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup.."); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new ConfigShardReadinessTask( + system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries), + system.dispatcher()); + } + } + } + + /** + * Task for handling the readiness state of the config shard. Reports success once the leader is elected. + */ + private static class ConfigShardReadinessTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final ActorRef shard; + + ConfigShardReadinessTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final ActorRef shard, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.shard = shard; + } + + @Override + void reschedule(int retries) { + LOG.debug("{} - Leader for config shard not found on try: {}, retrying..", + clusterWrapper.getCurrentMemberName(), retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); + + ask.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; + final java.util.Optional leaderActor = findLeader.getLeaderActor(); + if (leaderActor.isPresent()) { + // leader is found, backend seems ready, check if the frontend is ready + LOG.debug("{} - Leader for config shard is ready. Ending lookup.", + clusterWrapper.getCurrentMemberName()); + replyTo.tell(new Status.Success(null), noSender()); + } else { + tryReschedule(null); + } + } + } + }, system.dispatcher()); + } + } + public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; - private DistributedDataStore distributedConfigDatastore; - private DistributedDataStore distributedOperDatastore; + private AbstractDataStore distributedConfigDatastore; + private AbstractDataStore distributedOperDatastore; private ActorSystem actorSystem; private ClusterWrapper cluster; + private int maxRetries; public DistributedShardedDOMDataTree getShardingService() { return shardingService; @@ -483,26 +801,35 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return cluster; } - public DistributedDataStore getDistributedConfigDatastore() { + public AbstractDataStore getDistributedConfigDatastore() { return distributedConfigDatastore; } public ShardedDataTreeActorCreator setDistributedConfigDatastore( - final DistributedDataStore distributedConfigDatastore) { + final AbstractDataStore distributedConfigDatastore) { this.distributedConfigDatastore = distributedConfigDatastore; return this; } - public DistributedDataStore getDistributedOperDatastore() { + public AbstractDataStore getDistributedOperDatastore() { return distributedOperDatastore; } public ShardedDataTreeActorCreator setDistributedOperDatastore( - final DistributedDataStore distributedOperDatastore) { + final AbstractDataStore distributedOperDatastore) { this.distributedOperDatastore = distributedOperDatastore; return this; } + public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public int getLookupTaskMaxRetries() { + return maxRetries; + } + private void verify() { Preconditions.checkNotNull(shardingService); Preconditions.checkNotNull(actorSystem); @@ -515,6 +842,5 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { verify(); return Props.create(ShardedDataTreeActor.class, this); } - } }