X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardedDataTreeActor.java;h=f7c0d223072860149a57c3f91f216465963ac779;hb=refs%2Fchanges%2F70%2F91770%2F4;hp=3efbbabeb8fb1a6335995bf236d86098f29093be;hpb=8232a626b43fdd2f5799da0fbcfb0f02d3c8f4fb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index 3efbbabeb8..f7c0d22307 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -5,10 +5,9 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.sharding; -import static akka.actor.ActorRef.noSender; +import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -28,22 +27,21 @@ import akka.cluster.Member; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; -import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; +import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; @@ -90,10 +88,10 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ClusterWrapper clusterWrapper; // helper actorContext used only for static calls to executeAsync etc // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final ShardingServiceAddressResolver resolver; - private final AbstractDataStore distributedConfigDatastore; - private final AbstractDataStore distributedOperDatastore; + private final DistributedDataStoreInterface distributedConfigDatastore; + private final DistributedDataStoreInterface distributedOperDatastore; private final int lookupTaskMaxRetries; private final Map idToProducer = new HashMap<>(); @@ -107,7 +105,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { distributedConfigDatastore = builder.getDistributedConfigDatastore(); distributedOperDatastore = builder.getDistributedOperDatastore(); lookupTaskMaxRetries = builder.getLookupTaskMaxRetries(); - actorContext = distributedConfigDatastore.getActorContext(); + actorUtils = distributedConfigDatastore.getActorUtils(); resolver = new ShardingServiceAddressResolver( DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); @@ -119,12 +117,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - protected void handleRecover(final Object message) throws Exception { + protected void handleRecover(final Object message) { LOG.debug("Received a recover message {}", message); } @Override - protected void handleCommand(final Object message) throws Exception { + protected void handleCommand(final Object message) { LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message); if (message instanceof ClusterEvent.MemberUp) { memberUp((ClusterEvent.MemberUp) message); @@ -219,7 +217,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // fastpath if we have no peers if (resolver.getShardingServicePeerActorAddresses().isEmpty()) { - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); } final ActorRef sender = getSender(); @@ -231,7 +229,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorSelection actorSelection = actorSystem.actorSelection(address); futures.add( FutureConverters.toJava( - actorContext.executeOperationAsync( + actorUtils.executeOperationAsync( actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT)) .toCompletableFuture()); } @@ -240,7 +238,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { futures.toArray(new CompletableFuture[futures.size()])); combinedFuture - .thenRun(() -> sender.tell(new Success(null), noSender())) + .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender())) .exceptionally(throwable -> { sender.tell(new Status.Failure(throwable), self()); return null; @@ -271,7 +269,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorSelection selection = actorSystem.actorSelection(address); futures.add(FutureConverters.toJava( - actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees()))) + actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees()))) .toCompletableFuture()); } @@ -295,16 +293,16 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next()); if (registration == null) { LOG.warn("The notification contained a path on which no producer is registered, throwing away"); - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); return; } try { registration.close(); - getSender().tell(new Status.Success(null), noSender()); + getSender().tell(new Status.Success(null), ActorRef.noSender()); } catch (final DOMDataTreeProducerException e) { LOG.error("Unable to close producer", e); - getSender().tell(new Status.Failure(e), noSender()); + getSender().tell(new Status.Failure(e), ActorRef.noSender()); } } @@ -314,13 +312,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final DOMDataTreeIdentifier prefix = message.getPrefix(); - final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION - ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); + final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION + ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils(); // schedule a notification task for the reply actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, - context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); + utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); } private void onPrefixShardCreated(final PrefixShardCreated message) { @@ -336,7 +334,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ShardRemovalLookupTask removalTask = new ShardRemovalLookupTask(actorSystem, getSender(), - actorContext, message.getPrefix(), lookupTaskMaxRetries); + actorUtils, message.getPrefix(), lookupTaskMaxRetries); actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); } @@ -350,9 +348,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onStartConfigShardLookup(final StartConfigShardLookup message) { LOG.debug("Received StartConfigShardLookup: {}", message); - final ActorContext context = + final ActorUtils context = message.getType().equals(LogicalDatastoreType.CONFIGURATION) - ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); + ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils(); // schedule a notification task for the reply actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, @@ -410,7 +408,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; private final ClusterWrapper clusterWrapper; - private final ActorContext context; + private final ActorUtils context; private final DistributedShardedDOMDataTree shardingService; private final DOMDataTreeIdentifier toLookup; private final int lookupMaxRetries; @@ -418,7 +416,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { ShardCreationLookupTask(final ActorSystem system, final ActorRef replyTo, final ClusterWrapper clusterWrapper, - final ActorContext context, + final ActorUtils context, final DistributedShardedDOMDataTree shardingService, final DOMDataTreeIdentifier toLookup, final int lookupMaxRetries) { @@ -439,7 +437,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { localShardFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + public void onComplete(final Throwable throwable, final ActorRef actorRef) { if (throwable != null) { tryReschedule(throwable); } else { @@ -456,7 +454,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher()); @@ -470,7 +468,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; - private final ActorContext context; + private final ActorUtils context; private final ClusterWrapper clusterWrapper; private final ActorRef shard; private final DistributedShardedDOMDataTree shardingService; @@ -479,7 +477,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { ShardLeaderLookupTask(final ActorSystem system, final ActorRef replyTo, - final ActorContext context, + final ActorUtils context, final ClusterWrapper clusterWrapper, final ActorRef shard, final DistributedShardedDOMDataTree shardingService, @@ -501,14 +499,14 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); - ask.onComplete(new OnComplete() { + ask.onComplete(new OnComplete<>() { @Override - public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + public void onComplete(final Throwable throwable, final Object findLeaderReply) { if (throwable != null) { tryReschedule(throwable); } else { final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; - final java.util.Optional leaderActor = findLeader.getLeaderActor(); + final Optional leaderActor = findLeader.getLeaderActor(); if (leaderActor.isPresent()) { // leader is found, backend seems ready, check if the frontend is ready LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..", @@ -528,7 +526,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..", clusterWrapper.getCurrentMemberName(), toLookup, retries); system.scheduler().scheduleOnce( @@ -566,7 +564,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { shardingService.lookupShardFrontend(toLookup); if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) { - replyTo.tell(new Success(null), noSender()); + replyTo.tell(new Success(null), ActorRef.noSender()); } else { tryReschedule(null); } @@ -578,7 +576,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return false; } - if (YangInstanceIdentifier.EMPTY.equals(prefix.getRootIdentifier())) { + if (YangInstanceIdentifier.empty().equals(prefix.getRootIdentifier())) { return true; } @@ -590,7 +588,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher()); @@ -605,12 +603,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; - private final ActorContext context; + private final ActorUtils context; private final DOMDataTreeIdentifier toLookup; ShardRemovalLookupTask(final ActorSystem system, final ActorRef replyTo, - final ActorContext context, + final ActorUtils context, final DOMDataTreeIdentifier toLookup, final int lookupMaxRetries) { super(replyTo, lookupMaxRetries); @@ -627,12 +625,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { localShardFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + public void onComplete(final Throwable throwable, final ActorRef actorRef) { if (throwable != null) { //TODO Shouldn't we check why findLocalShard failed? LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future", toLookup); - replyTo.tell(new Success(null), noSender()); + replyTo.tell(new Success(null), ActorRef.noSender()); } else { tryReschedule(null); } @@ -641,7 +639,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..", toLookup, retries); system.scheduler().scheduleOnce( @@ -656,11 +654,11 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; - private final ActorContext context; + private final ActorUtils context; ConfigShardLookupTask(final ActorSystem system, final ActorRef replyTo, - final ActorContext context, + final ActorUtils context, final StartConfigShardLookup message, final int lookupMaxRetries) { super(replyTo, lookupMaxRetries); @@ -670,7 +668,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher()); @@ -685,7 +683,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { tryReschedule(null); } else { LOG.debug("Local backend for prefix configuration shard lookup successful"); - replyTo.tell(new Status.Success(null), noSender()); + replyTo.tell(new Status.Success(null), ActorRef.noSender()); } } } @@ -697,13 +695,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorSystem system; private final ActorRef replyTo; - private final ActorContext context; + private final ActorUtils context; private final ClusterWrapper clusterWrapper; private final ActorRef shard; ConfigShardReadinessTask(final ActorSystem system, final ActorRef replyTo, - final ActorContext context, + final ActorUtils context, final ClusterWrapper clusterWrapper, final ActorRef shard, final int lookupMaxRetries) { @@ -716,7 +714,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @Override - void reschedule(int retries) { + void reschedule(final int retries) { LOG.debug("{} - Leader for config shard not found on try: {}, retrying..", clusterWrapper.getCurrentMemberName(), retries); system.scheduler().scheduleOnce( @@ -727,19 +725,19 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { public void run() { final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); - ask.onComplete(new OnComplete() { + ask.onComplete(new OnComplete<>() { @Override - public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + public void onComplete(final Throwable throwable, final Object findLeaderReply) { if (throwable != null) { tryReschedule(throwable); } else { final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; - final java.util.Optional leaderActor = findLeader.getLeaderActor(); + final Optional leaderActor = findLeader.getLeaderActor(); if (leaderActor.isPresent()) { // leader is found, backend seems ready, check if the frontend is ready LOG.debug("{} - Leader for config shard is ready. Ending lookup.", clusterWrapper.getCurrentMemberName()); - replyTo.tell(new Status.Success(null), noSender()); + replyTo.tell(new Status.Success(null), ActorRef.noSender()); } else { tryReschedule(null); } @@ -752,8 +750,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; - private AbstractDataStore distributedConfigDatastore; - private AbstractDataStore distributedOperDatastore; + private DistributedDataStoreInterface distributedConfigDatastore; + private DistributedDataStoreInterface distributedOperDatastore; private ActorSystem actorSystem; private ClusterWrapper cluster; private int maxRetries; @@ -776,8 +774,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return this; } - public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) { - this.cluster = cluster; + public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) { + this.cluster = clusterWrapper; return this; } @@ -785,28 +783,28 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return cluster; } - public AbstractDataStore getDistributedConfigDatastore() { + public DistributedDataStoreInterface getDistributedConfigDatastore() { return distributedConfigDatastore; } public ShardedDataTreeActorCreator setDistributedConfigDatastore( - final AbstractDataStore distributedConfigDatastore) { + final DistributedDataStoreInterface distributedConfigDatastore) { this.distributedConfigDatastore = distributedConfigDatastore; return this; } - public AbstractDataStore getDistributedOperDatastore() { + public DistributedDataStoreInterface getDistributedOperDatastore() { return distributedOperDatastore; } public ShardedDataTreeActorCreator setDistributedOperDatastore( - final AbstractDataStore distributedOperDatastore) { + final DistributedDataStoreInterface distributedOperDatastore) { this.distributedOperDatastore = distributedOperDatastore; return this; } - public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) { - this.maxRetries = maxRetries; + public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) { + this.maxRetries = newMaxRetries; return this; } @@ -815,11 +813,11 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } private void verify() { - Preconditions.checkNotNull(shardingService); - Preconditions.checkNotNull(actorSystem); - Preconditions.checkNotNull(cluster); - Preconditions.checkNotNull(distributedConfigDatastore); - Preconditions.checkNotNull(distributedOperDatastore); + requireNonNull(shardingService); + requireNonNull(actorSystem); + requireNonNull(cluster); + requireNonNull(distributedConfigDatastore); + requireNonNull(distributedOperDatastore); } public Props props() {