X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardedDataTreeActor.java;h=cfbb526e9ca5f28a590625defb9615cde29d57c2;hb=refs%2Fchanges%2F09%2F50609%2F46;hp=63fd4a0867ddb340fd55658e01c3e29e515fd163;hpb=7204c455a1636a7fc89bcd28fe9e9000eaa81b3b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index 63fd4a0867..cfbb526e9c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -16,7 +16,6 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; -import akka.actor.Status.Failure; import akka.actor.Status.Success; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; @@ -27,29 +26,19 @@ import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.ClusterEvent.ReachableMember; import akka.cluster.ClusterEvent.UnreachableMember; import akka.cluster.Member; -import akka.cluster.ddata.DistributedData; -import akka.cluster.ddata.ORMap; -import akka.cluster.ddata.Replicator; -import akka.cluster.ddata.Replicator.Changed; -import akka.cluster.ddata.Replicator.Subscribe; -import akka.cluster.ddata.Replicator.Update; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.google.common.collect.Sets.SetView; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; -import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -59,14 +48,16 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; -import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; @@ -94,7 +85,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS); - static final int LOOKUP_TASK_MAX_RETRIES = 100; private final DistributedShardedDOMDataTree shardingService; private final ActorSystem actorSystem; @@ -105,14 +95,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ShardingServiceAddressResolver resolver; private final DistributedDataStore distributedConfigDatastore; private final DistributedDataStore distributedOperDatastore; + private final int lookupTaskMaxRetries; private final Map idToProducer = new HashMap<>(); private final Map idToShardRegistration = new HashMap<>(); private final Cluster cluster; - private final ActorRef replicator; - private ORMap currentData = ORMap.create(); private Map currentConfiguration = new HashMap<>(); ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { @@ -123,21 +112,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { clusterWrapper = builder.getClusterWrapper(); distributedConfigDatastore = builder.getDistributedConfigDatastore(); distributedOperDatastore = builder.getDistributedOperDatastore(); + lookupTaskMaxRetries = builder.getLookupTaskMaxRetries(); actorContext = distributedConfigDatastore.getActorContext(); resolver = new ShardingServiceAddressResolver( DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); clusterWrapper.subscribeToMemberEvents(self()); cluster = Cluster.get(actorSystem); - - replicator = DistributedData.get(context().system()).replicator(); } @Override public void preStart() { - final Subscribe> subscribe = - new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self()); - replicator.tell(subscribe, noSender()); } @Override @@ -147,7 +132,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { @Override protected void handleCommand(final Object message) throws Exception { - LOG.debug("Received {}", message); + LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message); if (message instanceof ClusterEvent.MemberUp) { memberUp((ClusterEvent.MemberUp) message); } else if (message instanceof ClusterEvent.MemberWeaklyUp) { @@ -160,8 +145,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { memberUnreachable((ClusterEvent.UnreachableMember) message); } else if (message instanceof ClusterEvent.ReachableMember) { memberReachable((ClusterEvent.ReachableMember) message); - } else if (message instanceof Changed) { - onConfigChanged((Changed) message); } else if (message instanceof ProducerCreated) { onProducerCreated((ProducerCreated) message); } else if (message instanceof NotifyProducerCreated) { @@ -172,51 +155,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { onNotifyProducerRemoved((NotifyProducerRemoved) message); } else if (message instanceof PrefixShardCreated) { onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof CreatePrefixShard) { - onCreatePrefixShard((CreatePrefixShard) message); - } else if (message instanceof RemovePrefixShard) { - onRemovePrefixShard((RemovePrefixShard) message); + } else if (message instanceof LookupPrefixShard) { + onLookupPrefixShard((LookupPrefixShard) message); + } else if (message instanceof PrefixShardRemovalLookup) { + onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message); } else if (message instanceof PrefixShardRemoved) { onPrefixShardRemoved((PrefixShardRemoved) message); + } else if (message instanceof StartConfigShardLookup) { + onStartConfigShardLookup((StartConfigShardLookup) message); } } - private void onConfigChanged(final Changed> change) { - LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change); - - currentData = change.dataValue(); - final Map changedConfig = change.dataValue().getEntries(); - - LOG.debug("Changed set {}", changedConfig); - - try { - final Map newConfig = - changedConfig.values().stream().collect( - Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity())); - resolveConfig(newConfig); - } catch (final IllegalStateException e) { - LOG.error("Failed, ", e); - } - - } - - private void resolveConfig(final Map newConfig) { - - // get the removed configurations - final SetView deleted = - Sets.difference(currentConfiguration.keySet(), newConfig.keySet()); - shardingService.resolveShardRemovals(deleted); - - // get the added configurations - final SetView additions = - Sets.difference(newConfig.keySet(), currentConfiguration.keySet()); - shardingService.resolveShardAdditions(additions); - // we can ignore those that existed previously since the potential changes in replicas will be handled by - // shard manager. - - currentConfiguration = new HashMap<>(newConfig); - } - @Override public String persistenceId() { return PERSISTENCE_ID; @@ -275,8 +224,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onProducerCreated(final ProducerCreated message) { LOG.debug("Received ProducerCreated: {}", message); - // fastpath if no replication is needed, since there is only one node - if (resolver.getShardingServicePeerActorAddresses().size() == 1) { + // fastpath if we have no peers + if (resolver.getShardingServicePeerActorAddresses().isEmpty()) { getSender().tell(new Status.Success(null), noSender()); } @@ -367,61 +316,34 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @SuppressWarnings("checkstyle:IllegalCatch") - private void onCreatePrefixShard(final CreatePrefixShard message) { - LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - - final PrefixShardConfiguration configuration = message.getConfiguration(); + private void onLookupPrefixShard(final LookupPrefixShard message) { + LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - final Update> update = - new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), - map -> map.put(cluster, configuration.toDataMapKey(), configuration)); + final DOMDataTreeIdentifier prefix = message.getPrefix(); - replicator.tell(update, self()); + final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION + ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); // schedule a notification task for the reply actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, - actorContext, shardingService, configuration.getPrefix()), - actorSystem.dispatcher()); + context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); } private void onPrefixShardCreated(final PrefixShardCreated message) { LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message); - final Collection addresses = resolver.getShardingServicePeerActorAddresses(); - final ActorRef sender = getSender(); - - final List> futures = new ArrayList<>(); - - for (final String address : addresses) { - final ActorSelection actorSelection = actorSystem.actorSelection(address); - futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection, - new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture()); - } + final PrefixShardConfiguration config = message.getConfiguration(); - final CompletableFuture combinedFuture = - CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); - - combinedFuture.thenRun(() -> { - sender.tell(new Status.Success(null), self()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix())); } - private void onRemovePrefixShard(final RemovePrefixShard message) { - LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - - //TODO the removal message should have the configuration or some other way to get to the key - final Update> removal = - new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), - map -> map.remove(cluster, "prefix=" + message.getPrefix())); - replicator.tell(removal, self()); + private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) { + LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message); final ShardRemovalLookupTask removalTask = new ShardRemovalLookupTask(actorSystem, getSender(), - actorContext, message.getPrefix()); + actorContext, message.getPrefix(), lookupTaskMaxRetries); actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); } @@ -429,15 +351,21 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onPrefixShardRemoved(final PrefixShardRemoved message) { LOG.debug("Received PrefixShardRemoved: {}", message); - final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix()); + shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix())); + } + + private void onStartConfigShardLookup(final StartConfigShardLookup message) { + LOG.debug("Received StartConfigShardLookup: {}", message); - if (registration == null) { - LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}", - message.getPrefix(), idToShardRegistration); - return; - } + final ActorContext context = + message.getType().equals(LogicalDatastoreType.CONFIGURATION) + ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); - registration.close(); + // schedule a notification task for the reply + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, + new ConfigShardLookupTask( + actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries), + actorSystem.dispatcher()); } private static MemberName memberToName(final Member member) { @@ -481,39 +409,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } - private abstract static class LookupTask implements Runnable { - - private final ActorRef replyTo; - private int retries = 0; - - private LookupTask(final ActorRef replyTo) { - this.replyTo = replyTo; - } - - abstract void reschedule(int retries); - - void tryReschedule(@Nullable final Throwable throwable) { - if (retries <= LOOKUP_TASK_MAX_RETRIES) { - retries++; - reschedule(retries); - } else { - fail(throwable); - } - } - - void fail(@Nullable final Throwable throwable) { - if (throwable == null) { - replyTo.tell(new Failure( - new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." - + "Failing..")), noSender()); - } else { - replyTo.tell(new Failure( - new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." - + "Failing..", throwable)), noSender()); - } - } - } - /** * Handles the lookup step of cds shard creation once the configuration is updated. */ @@ -525,20 +420,23 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorContext context; private final DistributedShardedDOMDataTree shardingService; private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; ShardCreationLookupTask(final ActorSystem system, final ActorRef replyTo, final ClusterWrapper clusterWrapper, final ActorContext context, final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.clusterWrapper = clusterWrapper; this.context = context; this.shardingService = shardingService; this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; } @Override @@ -557,7 +455,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef, - shardingService, toLookup), + shardingService, toLookup, lookupMaxRetries), system.dispatcher()); } } @@ -584,6 +482,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorRef shard; private final DistributedShardedDOMDataTree shardingService; private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; ShardLeaderLookupTask(final ActorSystem system, final ActorRef replyTo, @@ -591,8 +490,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ClusterWrapper clusterWrapper, final ActorRef shard, final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.context = context; @@ -600,6 +500,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { this.shard = shard; this.shardingService = shardingService; this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; } @Override @@ -621,7 +522,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { clusterWrapper.getCurrentMemberName(), toLookup); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, - new FrontendLookupTask(system, replyTo, shardingService, toLookup), + new FrontendLookupTask( + system, replyTo, shardingService, toLookup, lookupMaxRetries), system.dispatcher()); } else { tryReschedule(null); @@ -656,8 +558,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { FrontendLookupTask(final ActorSystem system, final ActorRef replyTo, final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.shardingService = shardingService; @@ -715,8 +618,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { ShardRemovalLookupTask(final ActorSystem system, final ActorRef replyTo, final ActorContext context, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.context = context; @@ -752,6 +656,115 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } + /** + * Task for handling the lookup of the backend for the configuration shard. + */ + private static class ConfigShardLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final int lookupTaskMaxRetries; + + ConfigShardLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final StartConfigShardLookup message, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.lookupTaskMaxRetries = lookupMaxRetries; + } + + @Override + void reschedule(int retries) { + LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Optional localShard = + context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID); + + if (!localShard.isPresent()) { + tryReschedule(null); + } else { + LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup.."); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, + new ConfigShardReadinessTask( + system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries), + system.dispatcher()); + } + } + } + + /** + * Task for handling the readiness state of the config shard. Reports success once the leader is elected. + */ + private static class ConfigShardReadinessTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final ActorRef shard; + + ConfigShardReadinessTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final ActorRef shard, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.shard = shard; + } + + @Override + void reschedule(int retries) { + LOG.debug("{} - Leader for config shard not found on try: {}, retrying..", + clusterWrapper.getCurrentMemberName(), retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); + + ask.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; + final java.util.Optional leaderActor = findLeader.getLeaderActor(); + if (leaderActor.isPresent()) { + // leader is found, backend seems ready, check if the frontend is ready + LOG.debug("{} - Leader for config shard is ready. Ending lookup.", + clusterWrapper.getCurrentMemberName()); + replyTo.tell(new Status.Success(null), noSender()); + } else { + tryReschedule(null); + } + } + } + }, system.dispatcher()); + } + } + public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; @@ -759,6 +772,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private DistributedDataStore distributedOperDatastore; private ActorSystem actorSystem; private ClusterWrapper cluster; + private int maxRetries; public DistributedShardedDOMDataTree getShardingService() { return shardingService; @@ -807,6 +821,15 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return this; } + public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public int getLookupTaskMaxRetries() { + return maxRetries; + } + private void verify() { Preconditions.checkNotNull(shardingService); Preconditions.checkNotNull(actorSystem);