summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
d2c2643)
Once the snapshot size grows large the backup rpc can take more
time than the default timeout value causing it to fail.
Add the option to override the timeout in the backup rpc.
Change-Id: I878066668f45abcfe758a7b90d34576bff1b7db0
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
+import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.Optional;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.Optional;
} else if (COMMIT_SNAPSHOT.equals(message)) {
context.getSnapshotManager().commit(-1, -1);
} else if (message instanceof GetSnapshot) {
} else if (COMMIT_SNAPSHOT.equals(message)) {
context.getSnapshotManager().commit(-1, -1);
} else if (message instanceof GetSnapshot) {
+ onGetSnapshot(sender, (GetSnapshot) message);
context.getSnapshotManager().apply(message);
}
context.getSnapshotManager().apply(message);
}
- private void onGetSnapshot(ActorRef sender) {
+ private void onGetSnapshot(ActorRef sender, GetSnapshot getSnapshot) {
log.debug("{}: onGetSnapshot", context.getId());
log.debug("{}: onGetSnapshot", context.getId());
if (context.getPersistenceProvider().isRecoveryApplicable()) {
CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
context.getReplicatedLog().last(), -1, true);
if (context.getPersistenceProvider().isRecoveryApplicable()) {
CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
context.getReplicatedLog().last(), -1, true);
+ final FiniteDuration timeout =
+ getSnapshot.getTimeout().map(Timeout::duration).orElse(snapshotReplyActorTimeout);
+
ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
- ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
- snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo(true)));
+ ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, timeout, context.getId(),
+ context.getPeerServerInfo(true)));
cohort.createSnapshot(snapshotReplyActor, Optional.empty());
} else {
cohort.createSnapshot(snapshotReplyActor, Optional.empty());
} else {
*/
package org.opendaylight.controller.cluster.raft.client.messages;
*/
package org.opendaylight.controller.cluster.raft.client.messages;
+import akka.util.Timeout;
+import java.util.Optional;
+
- * Internal client message to get a snapshot of the current state based on whether or not persistence is
- * enabled. Returns a {@link GetSnapshotReply} instance.
+ * Internal client message to get a snapshot of the current state based on whether or not persistence is enabled.
+ * Returns a {@link GetSnapshotReply} instance.
*
* @author Thomas Pantelis
*/
public final class GetSnapshot {
*
* @author Thomas Pantelis
*/
public final class GetSnapshot {
- public static final GetSnapshot INSTANCE = new GetSnapshot();
+ public static final GetSnapshot INSTANCE = new GetSnapshot(null);
+
+ private final Timeout timeout;
+
+ public GetSnapshot(final Timeout timeout) {
+ this.timeout = timeout;
+ }
- private GetSnapshot() {
+ public Optional<Timeout> getTimeout() {
+ return Optional.ofNullable(timeout);
type string;
description "The path and name of the file in which to store the backup.";
}
type string;
description "The path and name of the file in which to store the backup.";
}
+
+ leaf timeout {
+ type uint32 {
+ range 1..max;
+ }
+ units "seconds";
+ description "Optional timeout in seconds for the backup operation which will override all the different
+ timeouts that are being hit on the backend.";
+ }
}
description "Creates a backup file of the datastore state";
}
description "Creates a backup file of the datastore state";
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
return newFailedRpcResultFuture("A valid file path must be specified");
}
return newFailedRpcResultFuture("A valid file path must be specified");
}
+ final Uint32 timeout = input.getTimeout();
+ final Timeout opTimeout = timeout != null ? Timeout.apply(timeout.longValue(), TimeUnit.SECONDS)
+ : SHARD_MGR_TIMEOUT;
+
final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture = SettableFuture.create();
final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture = SettableFuture.create();
- ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
- Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
+ ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(new GetSnapshot(opTimeout));
+ Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(final List<DatastoreSnapshot> snapshots) {
saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
@Override
public void onSuccess(final List<DatastoreSnapshot> snapshots) {
saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
} else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
} else if (message instanceof GetSnapshot) {
} else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
} else if (message instanceof GetSnapshot) {
+ onGetSnapshot((GetSnapshot) message);
} else if (message instanceof ServerRemoved) {
onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof ChangeShardMembersVotingStatus) {
} else if (message instanceof ServerRemoved) {
onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof ChangeShardMembersVotingStatus) {
- private void onGetSnapshot() {
+ private void onGetSnapshot(final GetSnapshot getSnapshot) {
LOG.debug("{}: onGetSnapshot", persistenceId());
List<String> notInitialized = null;
LOG.debug("{}: onGetSnapshot", persistenceId());
List<String> notInitialized = null;
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
for (ShardInformation shardInfo: localShards.values()) {
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
for (ShardInformation shardInfo: localShards.values()) {
- shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+ shardInfo.getActor().tell(getSnapshot, replyActor);