Add optional timeout parameter for backup rpc 05/89905/8
authorTomas Cere <tomas.cere@pantheon.tech>
Fri, 29 Mar 2019 11:38:10 +0000 (12:38 +0100)
committerRobert Varga <nite@hq.sk>
Sun, 26 Jul 2020 08:20:58 +0000 (08:20 +0000)
Once the snapshot size grows large the backup rpc can take more
time than the default timeout value causing it to fail.
Add the option to override the timeout in the backup rpc.

Change-Id: I878066668f45abcfe758a7b90d34576bff1b7db0
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshot.java
opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang
opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java

index 0fe9a1a..60fca8c 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
 import java.util.Optional;
@@ -69,7 +70,7 @@ class RaftActorSnapshotMessageSupport {
         } else if (COMMIT_SNAPSHOT.equals(message)) {
             context.getSnapshotManager().commit(-1, -1);
         } else if (message instanceof GetSnapshot) {
-            onGetSnapshot(sender);
+            onGetSnapshot(sender, (GetSnapshot) message);
         } else {
             return false;
         }
@@ -105,16 +106,20 @@ class RaftActorSnapshotMessageSupport {
         context.getSnapshotManager().apply(message);
     }
 
-    private void onGetSnapshot(ActorRef sender) {
+    private void onGetSnapshot(ActorRef sender, GetSnapshot getSnapshot) {
         log.debug("{}: onGetSnapshot", context.getId());
 
+
         if (context.getPersistenceProvider().isRecoveryApplicable()) {
             CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
                     context.getReplicatedLog().last(), -1, true);
 
+            final FiniteDuration timeout =
+                    getSnapshot.getTimeout().map(Timeout::duration).orElse(snapshotReplyActorTimeout);
+
             ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
-                    ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
-                    snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo(true)));
+                    ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, timeout, context.getId(),
+                    context.getPeerServerInfo(true)));
 
             cohort.createSnapshot(snapshotReplyActor, Optional.empty());
         } else {
index d51f93f..c9bc2ed 100644 (file)
@@ -7,15 +7,25 @@
  */
 package org.opendaylight.controller.cluster.raft.client.messages;
 
+import akka.util.Timeout;
+import java.util.Optional;
+
 /**
- * Internal client message to get a snapshot of the current state based on whether or not persistence is
- * enabled. Returns a {@link GetSnapshotReply} instance.
+ * Internal client message to get a snapshot of the current state based on whether or not persistence is enabled.
+ * Returns a {@link GetSnapshotReply} instance.
  *
  * @author Thomas Pantelis
  */
 public final class GetSnapshot {
-    public static final GetSnapshot INSTANCE = new GetSnapshot();
+    public static final GetSnapshot INSTANCE = new GetSnapshot(null);
+
+    private final Timeout timeout;
+
+    public GetSnapshot(final Timeout timeout) {
+        this.timeout = timeout;
+    }
 
-    private GetSnapshot() {
+    public Optional<Timeout> getTimeout() {
+        return Optional.ofNullable(timeout);
     }
 }
index 109845c..260623f 100644 (file)
@@ -232,6 +232,15 @@ module cluster-admin {
               type string;
               description "The path and name of the file in which to store the backup.";
             }
+
+            leaf timeout {
+              type uint32 {
+                range 1..max;
+              }
+              units "seconds";
+              description "Optional timeout in seconds for the backup operation which will override all the different
+                           timeouts that are being hit on the backend.";
+            }
         }
 
         description "Creates a backup file of the datastore state";
index b5fb0c1..9ba40c5 100644 (file)
@@ -119,6 +119,7 @@ import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -622,9 +623,13 @@ public class ClusterAdminRpcService implements ClusterAdminService {
             return newFailedRpcResultFuture("A valid file path must be specified");
         }
 
+        final Uint32 timeout = input.getTimeout();
+        final Timeout opTimeout = timeout != null ? Timeout.apply(timeout.longValue(), TimeUnit.SECONDS)
+                : SHARD_MGR_TIMEOUT;
+
         final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture = SettableFuture.create();
-        ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
-        Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
+        ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(new GetSnapshot(opTimeout));
+        Futures.addCallback(future, new FutureCallback<>() {
             @Override
             public void onSuccess(final List<DatastoreSnapshot> snapshots) {
                 saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
index 2ff50f9..d1b9aba 100644 (file)
@@ -279,7 +279,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof WrappedShardResponse) {
             onWrappedShardResponse((WrappedShardResponse) message);
         } else if (message instanceof GetSnapshot) {
-            onGetSnapshot();
+            onGetSnapshot((GetSnapshot) message);
         } else if (message instanceof ServerRemoved) {
             onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof ChangeShardMembersVotingStatus) {
@@ -563,7 +563,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         persistShardList();
     }
 
-    private void onGetSnapshot() {
+    private void onGetSnapshot(final GetSnapshot getSnapshot) {
         LOG.debug("{}: onGetSnapshot", persistenceId());
 
         List<String> notInitialized = null;
@@ -588,7 +588,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for (ShardInformation shardInfo: localShards.values()) {
-            shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+            shardInfo.getActor().tell(getSnapshot, replyActor);
         }
     }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.