Add optional timeout parameter for backup rpc 05/89905/8
authorTomas Cere <tomas.cere@pantheon.tech>
Fri, 29 Mar 2019 11:38:10 +0000 (12:38 +0100)
committerRobert Varga <nite@hq.sk>
Sun, 26 Jul 2020 08:20:58 +0000 (08:20 +0000)
Once the snapshot size grows large the backup rpc can take more
time than the default timeout value causing it to fail.
Add the option to override the timeout in the backup rpc.

Change-Id: I878066668f45abcfe758a7b90d34576bff1b7db0
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshot.java
opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang
opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java

index 0fe9a1acc3126d316392ac34cce332f7c5b733a2..60fca8c55f6aa8c45f5baddd12ff7d8ccc80830c 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
 import java.util.Optional;
@@ -69,7 +70,7 @@ class RaftActorSnapshotMessageSupport {
         } else if (COMMIT_SNAPSHOT.equals(message)) {
             context.getSnapshotManager().commit(-1, -1);
         } else if (message instanceof GetSnapshot) {
-            onGetSnapshot(sender);
+            onGetSnapshot(sender, (GetSnapshot) message);
         } else {
             return false;
         }
@@ -105,16 +106,20 @@ class RaftActorSnapshotMessageSupport {
         context.getSnapshotManager().apply(message);
     }
 
-    private void onGetSnapshot(ActorRef sender) {
+    private void onGetSnapshot(ActorRef sender, GetSnapshot getSnapshot) {
         log.debug("{}: onGetSnapshot", context.getId());
 
+
         if (context.getPersistenceProvider().isRecoveryApplicable()) {
             CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
                     context.getReplicatedLog().last(), -1, true);
 
+            final FiniteDuration timeout =
+                    getSnapshot.getTimeout().map(Timeout::duration).orElse(snapshotReplyActorTimeout);
+
             ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
-                    ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
-                    snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo(true)));
+                    ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, timeout, context.getId(),
+                    context.getPeerServerInfo(true)));
 
             cohort.createSnapshot(snapshotReplyActor, Optional.empty());
         } else {
index d51f93f7eed780e4d5991ae37dce33dadbb98fcd..c9bc2ed3b2174451921a93fb33d6d135bbdf0bf4 100644 (file)
@@ -7,15 +7,25 @@
  */
 package org.opendaylight.controller.cluster.raft.client.messages;
 
+import akka.util.Timeout;
+import java.util.Optional;
+
 /**
- * Internal client message to get a snapshot of the current state based on whether or not persistence is
- * enabled. Returns a {@link GetSnapshotReply} instance.
+ * Internal client message to get a snapshot of the current state based on whether or not persistence is enabled.
+ * Returns a {@link GetSnapshotReply} instance.
  *
  * @author Thomas Pantelis
  */
 public final class GetSnapshot {
-    public static final GetSnapshot INSTANCE = new GetSnapshot();
+    public static final GetSnapshot INSTANCE = new GetSnapshot(null);
+
+    private final Timeout timeout;
+
+    public GetSnapshot(final Timeout timeout) {
+        this.timeout = timeout;
+    }
 
-    private GetSnapshot() {
+    public Optional<Timeout> getTimeout() {
+        return Optional.ofNullable(timeout);
     }
 }
index 109845c86f6420b03ca1040487391d979f4053b8..260623f487f115e904fdccf3ba6d6c4414d5fad1 100644 (file)
@@ -232,6 +232,15 @@ module cluster-admin {
               type string;
               description "The path and name of the file in which to store the backup.";
             }
+
+            leaf timeout {
+              type uint32 {
+                range 1..max;
+              }
+              units "seconds";
+              description "Optional timeout in seconds for the backup operation which will override all the different
+                           timeouts that are being hit on the backend.";
+            }
         }
 
         description "Creates a backup file of the datastore state";
index b5fb0c1f9929070754095f1792902c22d5fb64d7..9ba40c5b1f04758b6a120a8766ad93a2e54205d4 100644 (file)
@@ -119,6 +119,7 @@ import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -622,9 +623,13 @@ public class ClusterAdminRpcService implements ClusterAdminService {
             return newFailedRpcResultFuture("A valid file path must be specified");
         }
 
+        final Uint32 timeout = input.getTimeout();
+        final Timeout opTimeout = timeout != null ? Timeout.apply(timeout.longValue(), TimeUnit.SECONDS)
+                : SHARD_MGR_TIMEOUT;
+
         final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture = SettableFuture.create();
-        ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
-        Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
+        ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(new GetSnapshot(opTimeout));
+        Futures.addCallback(future, new FutureCallback<>() {
             @Override
             public void onSuccess(final List<DatastoreSnapshot> snapshots) {
                 saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
index 2ff50f9b4cecb52b993b393d1df6ce8f755ccf83..d1b9aba9d666268c6ab159f11e543ebc674fefe5 100644 (file)
@@ -279,7 +279,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
         } else if (message instanceof GetSnapshot) {
-            onGetSnapshot();
+            onGetSnapshot((GetSnapshot) message);
         } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof ChangeShardMembersVotingStatus) {
@@ -563,7 +563,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         persistShardList();
     }
 
-    private void onGetSnapshot() {
+    private void onGetSnapshot(final GetSnapshot getSnapshot) {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
@@ -588,7 +588,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for (ShardInformation shardInfo: localShards.values()) {
-            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+            shardInfo.getActor().tell(getSnapshot, replyActor);
         }
     }