X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardedDataTreeActor.java;h=63fd4a0867ddb340fd55658e01c3e29e515fd163;hp=3c1ae1069e3a656765ba834236fdbf564c3016a1;hb=7204c455a1636a7fc89bcd28fe9e9000eaa81b3b;hpb=afba6e0dc3885261f7f205b5957347aa0744c777 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index 3c1ae1069e..63fd4a0867 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -16,6 +16,8 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberExited; @@ -31,7 +33,10 @@ import akka.cluster.ddata.Replicator; import akka.cluster.ddata.Replicator.Changed; import akka.cluster.ddata.Replicator.Subscribe; import akka.cluster.ddata.Replicator.Update; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; @@ -44,6 +49,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -51,6 +57,8 @@ import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved; @@ -62,9 +70,17 @@ import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; +import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration; +import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry; import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.compat.java8.FutureConverters; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote @@ -72,9 +88,14 @@ import scala.compat.java8.FutureConverters; */ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { + private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class); + private static final String PERSISTENCE_ID = "sharding-service-actor"; private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); + static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS); + static final int LOOKUP_TASK_MAX_RETRIES = 100; + private final DistributedShardedDOMDataTree shardingService; private final ActorSystem actorSystem; private final ClusterWrapper clusterWrapper; @@ -356,6 +377,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { map -> map.put(cluster, configuration.toDataMapKey(), configuration)); replicator.tell(update, self()); + + // schedule a notification task for the reply + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, + new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, + actorContext, shardingService, configuration.getPrefix()), + actorSystem.dispatcher()); } private void onPrefixShardCreated(final PrefixShardCreated message) { @@ -391,6 +418,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), map -> map.remove(cluster, "prefix=" + message.getPrefix())); replicator.tell(removal, self()); + + final ShardRemovalLookupTask removalTask = + new ShardRemovalLookupTask(actorSystem, getSender(), + actorContext, message.getPrefix()); + + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); } private void onPrefixShardRemoved(final PrefixShardRemoved message) { @@ -448,6 +481,277 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } + private abstract static class LookupTask implements Runnable { + + private final ActorRef replyTo; + private int retries = 0; + + private LookupTask(final ActorRef replyTo) { + this.replyTo = replyTo; + } + + abstract void reschedule(int retries); + + void tryReschedule(@Nullable final Throwable throwable) { + if (retries <= LOOKUP_TASK_MAX_RETRIES) { + retries++; + reschedule(retries); + } else { + fail(throwable); + } + } + + void fail(@Nullable final Throwable throwable) { + if (throwable == null) { + replyTo.tell(new Failure( + new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." + + "Failing..")), noSender()); + } else { + replyTo.tell(new Failure( + new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." + + "Failing..", throwable)), noSender()); + } + } + } + + /** + * Handles the lookup step of cds shard creation once the configuration is updated. + */ + private static class ShardCreationLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ClusterWrapper clusterWrapper; + private final ActorContext context; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + + ShardCreationLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ClusterWrapper clusterWrapper, + final ActorContext context, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup) { + super(replyTo); + this.system = system; + this.replyTo = replyTo; + this.clusterWrapper = clusterWrapper; + this.context = context; + this.shardingService = shardingService; + this.toLookup = toLookup; + } + + @Override + public void run() { + final Future localShardFuture = + context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); + + localShardFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup); + + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef, + shardingService, toLookup), + system.dispatcher()); + } + } + }, system.dispatcher()); + } + + @Override + void reschedule(int retries) { + LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher()); + } + } + + /** + * Handles the readiness step by waiting for a leader of the created shard. + */ + private static class ShardLeaderLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final ActorRef shard; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + + ShardLeaderLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final ActorRef shard, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup) { + super(replyTo); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.shard = shard; + this.shardingService = shardingService; + this.toLookup = toLookup; + } + + @Override + public void run() { + + final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); + + ask.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; + final java.util.Optional leaderActor = findLeader.getLeaderActor(); + if (leaderActor.isPresent()) { + // leader is found, backend seems ready, check if the frontend is ready + LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..", + clusterWrapper.getCurrentMemberName(), toLookup); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new FrontendLookupTask(system, replyTo, shardingService, toLookup), + system.dispatcher()); + } else { + tryReschedule(null); + } + } + } + }, system.dispatcher()); + + } + + @Override + void reschedule(int retries) { + LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..", + clusterWrapper.getCurrentMemberName(), toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher()); + } + } + + /** + * After backend is ready this handles the last step - checking if we have a frontend shard for the backend, + * once this completes(which should be ready by the time the backend is created, this is just a sanity check in + * case they race), the future for the cds shard creation is completed and the shard is ready for use. + */ + private static final class FrontendLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final DistributedShardedDOMDataTree shardingService; + private final DOMDataTreeIdentifier toLookup; + + FrontendLookupTask(final ActorSystem system, + final ActorRef replyTo, + final DistributedShardedDOMDataTree shardingService, + final DOMDataTreeIdentifier toLookup) { + super(replyTo); + this.system = system; + this.replyTo = replyTo; + this.shardingService = shardingService; + this.toLookup = toLookup; + } + + @Override + public void run() { + final DOMDataTreePrefixTableEntry> entry = + shardingService.lookupShardFrontend(toLookup); + + if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) { + replyTo.tell(new Success(null), noSender()); + } else { + tryReschedule(null); + } + } + + private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry entry, + final DOMDataTreeIdentifier prefix) { + if (entry == null) { + return false; + } + + if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) { + return true; + } + + if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) { + return true; + } + + return false; + } + + @Override + void reschedule(int retries) { + LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher()); + } + } + + /** + * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the + * configuration. + */ + private static class ShardRemovalLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final DOMDataTreeIdentifier toLookup; + + ShardRemovalLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final DOMDataTreeIdentifier toLookup) { + super(replyTo); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.toLookup = toLookup; + } + + @Override + public void run() { + final Future localShardFuture = + context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); + + localShardFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + if (throwable != null) { + //TODO Shouldn't we check why findLocalShard failed? + LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future", + toLookup); + replyTo.tell(new Success(null), noSender()); + } else { + tryReschedule(null); + } + } + }, system.dispatcher()); + } + + @Override + void reschedule(int retries) { + LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..", + toLookup, retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher()); + } + } + public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; @@ -515,6 +819,5 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { verify(); return Props.create(ShardedDataTreeActor.class, this); } - } }