X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardedDataTreeActor.java;h=52c3d25faa18ef1de438d06e0c468f05998bf1a0;hb=refs%2Fchanges%2F61%2F95161%2F6;hp=671fbb89652b42ed1b34a2623346715ef6d559c1;hpb=ac919f21651e87b9652d02d7924f53e7e2b30471;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index 671fbb8965..52c3d25faa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -5,10 +5,9 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.sharding; -import static akka.actor.ActorRef.noSender; +import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -17,7 +16,6 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Success; -import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberExited; import akka.cluster.ClusterEvent.MemberRemoved; @@ -29,22 +27,21 @@ import akka.cluster.Member; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; -import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; +import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; @@ -77,6 +74,7 @@ import scala.concurrent.duration.FiniteDuration; * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote * nodes of newly open producers/shards on the local node. */ +@Deprecated(forRemoval = true) public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class); @@ -91,18 +89,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ClusterWrapper clusterWrapper; // helper actorContext used only for static calls to executeAsync etc // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final ShardingServiceAddressResolver resolver; - private final AbstractDataStore distributedConfigDatastore; - private final AbstractDataStore distributedOperDatastore; + private final DistributedDataStoreInterface distributedConfigDatastore; + private final DistributedDataStoreInterface distributedOperDatastore; private final int lookupTaskMaxRetries; private final Map idToProducer = new HashMap<>(); - private final Map idToShardRegistration = new HashMap<>(); - - private final Cluster cluster; - - private Map currentConfiguration = new HashMap<>(); ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName()); @@ -113,12 +106,11 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { distributedConfigDatastore = builder.getDistributedConfigDatastore(); distributedOperDatastore = builder.getDistributedOperDatastore(); lookupTaskMaxRetries = builder.getLookupTaskMaxRetries(); - actorContext = distributedConfigDatastore.getActorContext(); + actorUtils = distributedConfigDatastore.getActorUtils(); resolver = new ShardingServiceAddressResolver( DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); clusterWrapper.subscribeToMemberEvents(self()); - cluster = Cluster.get(actorSystem); } @Override @@ -126,12 +118,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - protected void handleRecover(final Object message) throws Exception { + protected void handleRecover(final Object message) { LOG.debug("Received a recover message {}", message); } @Override - protected void handleCommand(final Object message) throws Exception { + protected void handleCommand(final Object message) { LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message); if (message instanceof ClusterEvent.MemberUp) { memberUp((ClusterEvent.MemberUp) message); @@ -226,7 +218,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // fastpath if we have no peers if (resolver.getShardingServicePeerActorAddresses().isEmpty()) { - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); } final ActorRef sender = getSender(); @@ -238,7 +230,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorSelection actorSelection = actorSystem.actorSelection(address); futures.add( FutureConverters.toJava( - actorContext.executeOperationAsync( + actorUtils.executeOperationAsync( actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT)) .toCompletableFuture()); } @@ -246,12 +238,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final CompletableFuture combinedFuture = CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()])); - combinedFuture.thenRun(() -> { - sender.tell(new Status.Success(null), noSender()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + combinedFuture + .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender())) + .exceptionally(throwable -> { + sender.tell(new Status.Failure(throwable), self()); + return null; + }); } private void onNotifyProducerCreated(final NotifyProducerCreated message) { @@ -278,7 +270,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorSelection selection = actorSystem.actorSelection(address); futures.add(FutureConverters.toJava( - actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees()))) + actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees()))) .toCompletableFuture()); } @@ -302,16 +294,16 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next()); if (registration == null) { LOG.warn("The notification contained a path on which no producer is registered, throwing away"); - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); return; } try { registration.close(); - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); } catch (final DOMDataTreeProducerException e) { LOG.error("Unable to close producer", e); - getSender().tell(new Status.Failure(e), noSender()); + getSender().tell(new Status.Failure(e), ActorRef.noSender()); } } @@ -321,13 +313,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final DOMDataTreeIdentifier prefix = message.getPrefix(); - final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION - ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); + final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION + ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils(); // schedule a notification task for the reply actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, - context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); + utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); } private void onPrefixShardCreated(final PrefixShardCreated message) { @@ -343,7 +335,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ShardRemovalLookupTask removalTask = new ShardRemovalLookupTask(actorSystem, getSender(), - actorContext, message.getPrefix(), lookupTaskMaxRetries); + actorUtils, message.getPrefix(), lookupTaskMaxRetries); actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); } @@ -357,14 +349,14 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onStartConfigShardLookup(final StartConfigShardLookup message) { LOG.debug("Received StartConfigShardLookup: {}", message); - final ActorContext context = + final ActorUtils context = message.getType().equals(LogicalDatastoreType.CONFIGURATION) - ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); + ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils(); // schedule a notification task for the reply actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, new ConfigShardLookupTask( - actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries), + actorSystem, getSender(), context, message, lookupTaskMaxRetries), actorSystem.dispatcher()); } @@ -417,7 +409,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; private final ClusterWrapper clusterWrapper; - private final ActorContext context; + private final ActorUtils context; private final DistributedShardedDOMDataTree shardingService; private final DOMDataTreeIdentifier toLookup; private final int lookupMaxRetries; @@ -425,7 +417,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { ShardCreationLookupTask(final ActorSystem system, final ActorRef replyTo, final ClusterWrapper clusterWrapper, - final ActorContext context, + final ActorUtils context, final DistributedShardedDOMDataTree shardingService, final DOMDataTreeIdentifier toLookup, final int lookupMaxRetries) { @@ -446,7 +438,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { localShardFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + public void onComplete(final Throwable throwable, final ActorRef actorRef) { if (throwable != null) { tryReschedule(throwable); } else { @@ -463,7 +455,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher()); @@ -477,7 +469,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; - private final ActorContext context; + private final ActorUtils context; private final ClusterWrapper clusterWrapper; private final ActorRef shard; private final DistributedShardedDOMDataTree shardingService; @@ -486,7 +478,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { ShardLeaderLookupTask(final ActorSystem system, final ActorRef replyTo, - final ActorContext context, + final ActorUtils context, final ClusterWrapper clusterWrapper, final ActorRef shard, final DistributedShardedDOMDataTree shardingService, @@ -508,14 +500,14 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); - ask.onComplete(new OnComplete() { + ask.onComplete(new OnComplete<>() { @Override - public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + public void onComplete(final Throwable throwable, final Object findLeaderReply) { if (throwable != null) { tryReschedule(throwable); } else { final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; - final java.util.Optional leaderActor = findLeader.getLeaderActor(); + final Optional leaderActor = findLeader.getLeaderActor(); if (leaderActor.isPresent()) { // leader is found, backend seems ready, check if the frontend is ready LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..", @@ -535,7 +527,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..", clusterWrapper.getCurrentMemberName(), toLookup, retries); system.scheduler().scheduleOnce( @@ -573,7 +565,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { shardingService.lookupShardFrontend(toLookup); if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) { - replyTo.tell(new Success(null), noSender()); + replyTo.tell(new Success(null), ActorRef.noSender()); } else { tryReschedule(null); } @@ -585,7 +577,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return false; } - if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) { + if (YangInstanceIdentifier.empty().equals(prefix.getRootIdentifier())) { return true; } @@ -597,7 +589,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher()); @@ -612,12 +604,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; - private final ActorContext context; + private final ActorUtils context; private final DOMDataTreeIdentifier toLookup; ShardRemovalLookupTask(final ActorSystem system, final ActorRef replyTo, - final ActorContext context, + final ActorUtils context, final DOMDataTreeIdentifier toLookup, final int lookupMaxRetries) { super(replyTo, lookupMaxRetries); @@ -634,12 +626,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { localShardFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + public void onComplete(final Throwable throwable, final ActorRef actorRef) { if (throwable != null) { //TODO Shouldn't we check why findLocalShard failed? LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future", toLookup); - replyTo.tell(new Success(null), noSender()); + replyTo.tell(new Success(null), ActorRef.noSender()); } else { tryReschedule(null); } @@ -648,7 +640,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..", toLookup, retries); system.scheduler().scheduleOnce( @@ -663,26 +655,21 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; - private final ActorContext context; - private final ClusterWrapper clusterWrapper; - private final int lookupTaskMaxRetries; + private final ActorUtils context; ConfigShardLookupTask(final ActorSystem system, final ActorRef replyTo, - final ActorContext context, - final ClusterWrapper clusterWrapper, + final ActorUtils context, final StartConfigShardLookup message, final int lookupMaxRetries) { super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.context = context; - this.clusterWrapper = clusterWrapper; - this.lookupTaskMaxRetries = lookupMaxRetries; } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher()); @@ -696,12 +683,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { if (!localShard.isPresent()) { tryReschedule(null); } else { - LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup.."); - system.scheduler().scheduleOnce( - SHARD_LOOKUP_TASK_INTERVAL, - new ConfigShardReadinessTask( - system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries), - system.dispatcher()); + LOG.debug("Local backend for prefix configuration shard lookup successful"); + replyTo.tell(new Status.Success(null), ActorRef.noSender()); } } } @@ -713,13 +696,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; - private final ActorContext context; + private final ActorUtils context; private final ClusterWrapper clusterWrapper; private final ActorRef shard; ConfigShardReadinessTask(final ActorSystem system, final ActorRef replyTo, - final ActorContext context, + final ActorUtils context, final ClusterWrapper clusterWrapper, final ActorRef shard, final int lookupMaxRetries) { @@ -732,7 +715,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("{} - Leader for config shard not found on try: {}, retrying..", clusterWrapper.getCurrentMemberName(), retries); system.scheduler().scheduleOnce( @@ -743,19 +726,19 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { public void run() { final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); - ask.onComplete(new OnComplete() { + ask.onComplete(new OnComplete<>() { @Override - public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + public void onComplete(final Throwable throwable, final Object findLeaderReply) { if (throwable != null) { tryReschedule(throwable); } else { final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; - final java.util.Optional leaderActor = findLeader.getLeaderActor(); + final Optional leaderActor = findLeader.getLeaderActor(); if (leaderActor.isPresent()) { // leader is found, backend seems ready, check if the frontend is ready LOG.debug("{} - Leader for config shard is ready. Ending lookup.", clusterWrapper.getCurrentMemberName()); - replyTo.tell(new Status.Success(null), noSender()); + replyTo.tell(new Status.Success(null), ActorRef.noSender()); } else { tryReschedule(null); } @@ -768,8 +751,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; - private AbstractDataStore distributedConfigDatastore; - private AbstractDataStore distributedOperDatastore; + private DistributedDataStoreInterface distributedConfigDatastore; + private DistributedDataStoreInterface distributedOperDatastore; private ActorSystem actorSystem; private ClusterWrapper cluster; private int maxRetries; @@ -792,8 +775,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return this; } - public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) { - this.cluster = cluster; + public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) { + this.cluster = clusterWrapper; return this; } @@ -801,28 +784,28 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return cluster; } - public AbstractDataStore getDistributedConfigDatastore() { + public DistributedDataStoreInterface getDistributedConfigDatastore() { return distributedConfigDatastore; } public ShardedDataTreeActorCreator setDistributedConfigDatastore( - final AbstractDataStore distributedConfigDatastore) { + final DistributedDataStoreInterface distributedConfigDatastore) { this.distributedConfigDatastore = distributedConfigDatastore; return this; } - public AbstractDataStore getDistributedOperDatastore() { + public DistributedDataStoreInterface getDistributedOperDatastore() { return distributedOperDatastore; } public ShardedDataTreeActorCreator setDistributedOperDatastore( - final AbstractDataStore distributedOperDatastore) { + final DistributedDataStoreInterface distributedOperDatastore) { this.distributedOperDatastore = distributedOperDatastore; return this; } - public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) { - this.maxRetries = maxRetries; + public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) { + this.maxRetries = newMaxRetries; return this; } @@ -831,11 +814,11 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } private void verify() { - Preconditions.checkNotNull(shardingService); - Preconditions.checkNotNull(actorSystem); - Preconditions.checkNotNull(cluster); - Preconditions.checkNotNull(distributedConfigDatastore); - Preconditions.checkNotNull(distributedOperDatastore); + requireNonNull(shardingService); + requireNonNull(actorSystem); + requireNonNull(cluster); + requireNonNull(distributedConfigDatastore); + requireNonNull(distributedOperDatastore); } public Props props() {