X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardedDataTreeActor.java;h=8e8ff491d3088db64eb1c013c3ae2cb2433b1221;hb=3dc48592696e6a4535c0e125c1e23dbc62bc9091;hp=cfbb526e9ca5f28a590625defb9615cde29d57c2;hpb=149feb98f151186975fe42bab5853e05aafd4b51;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index cfbb526e9c..8e8ff491d3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.sharding; -import static akka.actor.ActorRef.noSender; - import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -17,7 +15,6 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Success; -import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberExited; import akka.cluster.ClusterEvent.MemberRemoved; @@ -41,8 +38,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; @@ -93,16 +90,11 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore private final ActorContext actorContext; private final ShardingServiceAddressResolver resolver; - private final DistributedDataStore distributedConfigDatastore; - private final DistributedDataStore distributedOperDatastore; + private final AbstractDataStore distributedConfigDatastore; + private final AbstractDataStore distributedOperDatastore; private final int lookupTaskMaxRetries; private final Map idToProducer = new HashMap<>(); - private final Map idToShardRegistration = new HashMap<>(); - - private final Cluster cluster; - - private Map currentConfiguration = new HashMap<>(); ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName()); @@ -118,7 +110,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); clusterWrapper.subscribeToMemberEvents(self()); - cluster = Cluster.get(actorSystem); } @Override @@ -126,12 +117,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - protected void handleRecover(final Object message) throws Exception { + protected void handleRecover(final Object message) { LOG.debug("Received a recover message {}", message); } @Override - protected void handleCommand(final Object message) throws Exception { + protected void handleCommand(final Object message) { LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message); if (message instanceof ClusterEvent.MemberUp) { memberUp((ClusterEvent.MemberUp) message); @@ -226,7 +217,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // fastpath if we have no peers if (resolver.getShardingServicePeerActorAddresses().isEmpty()) { - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); } final ActorRef sender = getSender(); @@ -246,12 +237,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final CompletableFuture combinedFuture = CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()])); - combinedFuture.thenRun(() -> { - sender.tell(new Status.Success(null), noSender()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + combinedFuture + .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender())) + .exceptionally(throwable -> { + sender.tell(new Status.Failure(throwable), self()); + return null; + }); } private void onNotifyProducerCreated(final NotifyProducerCreated message) { @@ -302,16 +293,16 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next()); if (registration == null) { LOG.warn("The notification contained a path on which no producer is registered, throwing away"); - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); return; } try { registration.close(); - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); } catch (final DOMDataTreeProducerException e) { LOG.error("Unable to close producer", e); - getSender().tell(new Status.Failure(e), noSender()); + getSender().tell(new Status.Failure(e), ActorRef.noSender()); } } @@ -364,7 +355,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // schedule a notification task for the reply actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, new ConfigShardLookupTask( - actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries), + actorSystem, getSender(), context, message, lookupTaskMaxRetries), actorSystem.dispatcher()); } @@ -446,7 +437,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { localShardFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + public void onComplete(Throwable throwable, ActorRef actorRef) { if (throwable != null) { tryReschedule(throwable); } else { @@ -510,7 +501,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { ask.onComplete(new OnComplete() { @Override - public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + public void onComplete(final Throwable throwable, final Object findLeaderReply) { if (throwable != null) { tryReschedule(throwable); } else { @@ -573,7 +564,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { shardingService.lookupShardFrontend(toLookup); if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) { - replyTo.tell(new Success(null), noSender()); + replyTo.tell(new Success(null), ActorRef.noSender()); } else { tryReschedule(null); } @@ -634,12 +625,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { localShardFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + public void onComplete(Throwable throwable, ActorRef actorRef) { if (throwable != null) { //TODO Shouldn't we check why findLocalShard failed? LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future", toLookup); - replyTo.tell(new Success(null), noSender()); + replyTo.tell(new Success(null), ActorRef.noSender()); } else { tryReschedule(null); } @@ -664,21 +655,16 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; private final ActorContext context; - private final ClusterWrapper clusterWrapper; - private final int lookupTaskMaxRetries; ConfigShardLookupTask(final ActorSystem system, final ActorRef replyTo, final ActorContext context, - final ClusterWrapper clusterWrapper, final StartConfigShardLookup message, final int lookupMaxRetries) { super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.context = context; - this.clusterWrapper = clusterWrapper; - this.lookupTaskMaxRetries = lookupMaxRetries; } @Override @@ -696,12 +682,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { if (!localShard.isPresent()) { tryReschedule(null); } else { - LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup.."); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, - new ConfigShardReadinessTask( - system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries), - system.dispatcher()); + LOG.debug("Local backend for prefix configuration shard lookup successful"); + replyTo.tell(new Status.Success(null), ActorRef.noSender()); } } } @@ -745,7 +727,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { ask.onComplete(new OnComplete() { @Override - public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + public void onComplete(final Throwable throwable, final Object findLeaderReply) { if (throwable != null) { tryReschedule(throwable); } else { @@ -755,7 +737,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // leader is found, backend seems ready, check if the frontend is ready LOG.debug("{} - Leader for config shard is ready. Ending lookup.", clusterWrapper.getCurrentMemberName()); - replyTo.tell(new Status.Success(null), noSender()); + replyTo.tell(new Status.Success(null), ActorRef.noSender()); } else { tryReschedule(null); } @@ -768,8 +750,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; - private DistributedDataStore distributedConfigDatastore; - private DistributedDataStore distributedOperDatastore; + private AbstractDataStore distributedConfigDatastore; + private AbstractDataStore distributedOperDatastore; private ActorSystem actorSystem; private ClusterWrapper cluster; private int maxRetries; @@ -792,8 +774,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return this; } - public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) { - this.cluster = cluster; + public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) { + this.cluster = clusterWrapper; return this; } @@ -801,28 +783,28 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return cluster; } - public DistributedDataStore getDistributedConfigDatastore() { + public AbstractDataStore getDistributedConfigDatastore() { return distributedConfigDatastore; } public ShardedDataTreeActorCreator setDistributedConfigDatastore( - final DistributedDataStore distributedConfigDatastore) { + final AbstractDataStore distributedConfigDatastore) { this.distributedConfigDatastore = distributedConfigDatastore; return this; } - public DistributedDataStore getDistributedOperDatastore() { + public AbstractDataStore getDistributedOperDatastore() { return distributedOperDatastore; } public ShardedDataTreeActorCreator setDistributedOperDatastore( - final DistributedDataStore distributedOperDatastore) { + final AbstractDataStore distributedOperDatastore) { this.distributedOperDatastore = distributedOperDatastore; return this; } - public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) { - this.maxRetries = maxRetries; + public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) { + this.maxRetries = newMaxRetries; return this; }