X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardedDataTreeActor.java;h=d0df84893d2d7f75ff91d49a5bd0cb4eec13fddc;hb=77e5860f42832140d27cff8e08e90b0b2947df31;hp=cfbb526e9ca5f28a590625defb9615cde29d57c2;hpb=149feb98f151186975fe42bab5853e05aafd4b51;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index cfbb526e9c..d0df84893d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -17,7 +17,6 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Success; -import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberExited; import akka.cluster.ClusterEvent.MemberRemoved; @@ -41,8 +40,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; @@ -93,16 +92,11 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore private final ActorContext actorContext; private final ShardingServiceAddressResolver resolver; - private final DistributedDataStore distributedConfigDatastore; - private final DistributedDataStore distributedOperDatastore; + private final AbstractDataStore distributedConfigDatastore; + private final AbstractDataStore distributedOperDatastore; private final int lookupTaskMaxRetries; private final Map idToProducer = new HashMap<>(); - private final Map idToShardRegistration = new HashMap<>(); - - private final Cluster cluster; - - private Map currentConfiguration = new HashMap<>(); ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName()); @@ -118,7 +112,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); clusterWrapper.subscribeToMemberEvents(self()); - cluster = Cluster.get(actorSystem); } @Override @@ -246,12 +239,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final CompletableFuture combinedFuture = CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()])); - combinedFuture.thenRun(() -> { - sender.tell(new Status.Success(null), noSender()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + combinedFuture + .thenRun(() -> sender.tell(new Success(null), noSender())) + .exceptionally(throwable -> { + sender.tell(new Status.Failure(throwable), self()); + return null; + }); } private void onNotifyProducerCreated(final NotifyProducerCreated message) { @@ -364,7 +357,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // schedule a notification task for the reply actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, new ConfigShardLookupTask( - actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries), + actorSystem, getSender(), context, message, lookupTaskMaxRetries), actorSystem.dispatcher()); } @@ -664,21 +657,16 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; private final ActorContext context; - private final ClusterWrapper clusterWrapper; - private final int lookupTaskMaxRetries; ConfigShardLookupTask(final ActorSystem system, final ActorRef replyTo, final ActorContext context, - final ClusterWrapper clusterWrapper, final StartConfigShardLookup message, final int lookupMaxRetries) { super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.context = context; - this.clusterWrapper = clusterWrapper; - this.lookupTaskMaxRetries = lookupMaxRetries; } @Override @@ -696,12 +684,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { if (!localShard.isPresent()) { tryReschedule(null); } else { - LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup.."); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, - new ConfigShardReadinessTask( - system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries), - system.dispatcher()); + LOG.debug("Local backend for prefix configuration shard lookup successful"); + replyTo.tell(new Status.Success(null), noSender()); } } } @@ -768,8 +752,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; - private DistributedDataStore distributedConfigDatastore; - private DistributedDataStore distributedOperDatastore; + private AbstractDataStore distributedConfigDatastore; + private AbstractDataStore distributedOperDatastore; private ActorSystem actorSystem; private ClusterWrapper cluster; private int maxRetries; @@ -792,8 +776,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return this; } - public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) { - this.cluster = cluster; + public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) { + this.cluster = clusterWrapper; return this; } @@ -801,28 +785,28 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return cluster; } - public DistributedDataStore getDistributedConfigDatastore() { + public AbstractDataStore getDistributedConfigDatastore() { return distributedConfigDatastore; } public ShardedDataTreeActorCreator setDistributedConfigDatastore( - final DistributedDataStore distributedConfigDatastore) { + final AbstractDataStore distributedConfigDatastore) { this.distributedConfigDatastore = distributedConfigDatastore; return this; } - public DistributedDataStore getDistributedOperDatastore() { + public AbstractDataStore getDistributedOperDatastore() { return distributedOperDatastore; } public ShardedDataTreeActorCreator setDistributedOperDatastore( - final DistributedDataStore distributedOperDatastore) { + final AbstractDataStore distributedOperDatastore) { this.distributedOperDatastore = distributedOperDatastore; return this; } - public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) { - this.maxRetries = maxRetries; + public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) { + this.maxRetries = newMaxRetries; return this; }