Add optional timeout parameter for backup rpc
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index e1752cc57d5c8347948887c86c06fb9162e93ca8..d1b9aba9d666268c6ab159f11e543ebc674fefe5 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import static akka.pattern.Patterns.ask;
+import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
 import akka.actor.Address;
@@ -34,7 +35,7 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -115,7 +116,7 @@ import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
@@ -137,7 +138,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // Stores a mapping between a shard name and it's corresponding information
     // Shard names look like inventory, topology etc and are as specified in
     // configuration
-    private final Map<String, ShardInformation> localShards = new HashMap<>();
+    @VisibleForTesting
+    final Map<String, ShardInformation> localShards = new HashMap<>();
 
     // The type of a ShardManager reflects the type of the datastore itself
     // A data store could be of type config/operational
@@ -147,7 +149,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Configuration configuration;
 
-    private final String shardDispatcherPath;
+    @VisibleForTesting
+    final String shardDispatcherPath;
 
     private final ShardManagerInfo shardManagerMBean;
 
@@ -157,9 +160,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-    private final ShardPeerAddressResolver peerAddressResolver;
+    @VisibleForTesting
+    final ShardPeerAddressResolver peerAddressResolver;
 
-    private SchemaContext schemaContext;
+    private EffectiveModelContext schemaContext;
 
     private DatastoreSnapshot restoreFromSnapshot;
 
@@ -275,7 +279,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
         } else if (message instanceof GetSnapshot) {
-            onGetSnapshot();
+            onGetSnapshot((GetSnapshot) message);
         } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof ChangeShardMembersVotingStatus) {
@@ -353,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         configUpdateHandler.initListener(dataStore, datastoreType);
     }
 
-    private void onShutDown() {
+    void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
@@ -422,6 +426,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
                                           final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -442,7 +448,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -463,6 +469,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
             final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -483,7 +491,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
                 new RemoveServer(shardId.toString()), removeServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -527,7 +535,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
 
-            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
+            final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Boolean result) {
                     if (failure == null) {
@@ -555,7 +563,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         persistShardList();
     }
 
-    private void onGetSnapshot() {
+    private void onGetSnapshot(final GetSnapshot getSnapshot) {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
@@ -580,7 +588,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for (ShardInformation shardInfo: localShards.values()) {
-            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+            shardInfo.getActor().tell(getSnapshot, replyActor);
         }
     }
 
@@ -1133,7 +1141,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param message the message to send
      */
     private void updateSchemaContext(final Object message) {
-        schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+        schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext();
 
         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
 
@@ -1226,7 +1234,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1279,18 +1287,27 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
 
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
-            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
-                    newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
-                        shardSnapshots.get(shardName)), peerAddressResolver));
+            localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
+                    newShardDatastoreContext(shardName), shardSnapshots));
         }
     }
 
+    @VisibleForTesting
+    ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
+                                        final Map<String, String> peerAddresses,
+                                        final DatastoreContext datastoreContext,
+                                        final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+        return new ShardInformation(shardName, shardId, peerAddresses,
+                datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
+                peerAddressResolver);
+    }
+
     /**
      * Given the name of the shard find the addresses of all it's peers.
      *
      * @param shardName the shard name
      */
-    private Map<String, String> getPeerAddresses(final String shardName) {
+    Map<String, String> getPeerAddresses(final String shardName) {
         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
         return getPeerAddresses(shardName, members);
     }
@@ -1416,12 +1433,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
         LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
         sender.tell(new Status.Failure(new AlreadyExistsException(
             String.format("Local shard %s already exists", shardName))), getSelf());
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
                                 final RemotePrimaryShardFound response, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
@@ -1456,6 +1477,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
     }
 
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
@@ -1505,7 +1528,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object addServerResponse) {
                 if (failure != null) {
@@ -1709,7 +1732,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
                     Timeout.apply(30, TimeUnit.SECONDS));
 
-            future.onComplete(new OnComplete<Object>() {
+            future.onComplete(new OnComplete<>() {
                 @Override
                 public void onComplete(final Throwable failure, final Object response) {
                     if (failure != null) {
@@ -1758,7 +1781,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 .getShardInitializationTimeout().duration().$times(2));
 
         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
@@ -1804,7 +1827,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
 
-        futureObj.onComplete(new OnComplete<Object>() {
+        futureObj.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object response) {
                 shardReplicaOperationsInProgress.remove(shardName);
@@ -1952,10 +1975,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
          */
         protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
                 final String persistenceId, final ActorRef shardManagerActor) {
-            this.targetActor = Preconditions.checkNotNull(targetActor);
-            this.shardName = Preconditions.checkNotNull(shardName);
-            this.persistenceId = Preconditions.checkNotNull(persistenceId);
-            this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+            this.targetActor = requireNonNull(targetActor);
+            this.shardName = requireNonNull(shardName);
+            this.persistenceId = requireNonNull(persistenceId);
+            this.shardManagerActor = requireNonNull(shardManagerActor);
         }
 
         public ActorRef getTargetActor() {