import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
+import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.Optional;
} else if (COMMIT_SNAPSHOT.equals(message)) {
context.getSnapshotManager().commit(-1, -1);
} else if (message instanceof GetSnapshot) {
- onGetSnapshot(sender);
+ onGetSnapshot(sender, (GetSnapshot) message);
} else {
return false;
}
context.getSnapshotManager().apply(message);
}
- private void onGetSnapshot(ActorRef sender) {
+ private void onGetSnapshot(ActorRef sender, GetSnapshot getSnapshot) {
log.debug("{}: onGetSnapshot", context.getId());
+
if (context.getPersistenceProvider().isRecoveryApplicable()) {
CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
context.getReplicatedLog().last(), -1, true);
+ final FiniteDuration timeout =
+ getSnapshot.getTimeout().map(Timeout::duration).orElse(snapshotReplyActorTimeout);
+
ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
- ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
- snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo(true)));
+ ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, timeout, context.getId(),
+ context.getPeerServerInfo(true)));
cohort.createSnapshot(snapshotReplyActor, Optional.empty());
} else {
*/
package org.opendaylight.controller.cluster.raft.client.messages;
+import akka.util.Timeout;
+import java.util.Optional;
+
/**
- * Internal client message to get a snapshot of the current state based on whether or not persistence is
- * enabled. Returns a {@link GetSnapshotReply} instance.
+ * Internal client message to get a snapshot of the current state based on whether or not persistence is enabled.
+ * Returns a {@link GetSnapshotReply} instance.
*
* @author Thomas Pantelis
*/
public final class GetSnapshot {
- public static final GetSnapshot INSTANCE = new GetSnapshot();
+ public static final GetSnapshot INSTANCE = new GetSnapshot(null);
+
+ private final Timeout timeout;
+
+ public GetSnapshot(final Timeout timeout) {
+ this.timeout = timeout;
+ }
- private GetSnapshot() {
+ public Optional<Timeout> getTimeout() {
+ return Optional.ofNullable(timeout);
}
}
type string;
description "The path and name of the file in which to store the backup.";
}
+
+ leaf timeout {
+ type uint32 {
+ range 1..max;
+ }
+ units "seconds";
+ description "Optional timeout in seconds for the backup operation which will override all the different
+ timeouts that are being hit on the backend.";
+ }
}
description "Creates a backup file of the datastore state";
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
return newFailedRpcResultFuture("A valid file path must be specified");
}
+ final Uint32 timeout = input.getTimeout();
+ final Timeout opTimeout = timeout != null ? Timeout.apply(timeout.longValue(), TimeUnit.SECONDS)
+ : SHARD_MGR_TIMEOUT;
+
final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture = SettableFuture.create();
- ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
- Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
+ ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(new GetSnapshot(opTimeout));
+ Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(final List<DatastoreSnapshot> snapshots) {
saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
} else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
} else if (message instanceof GetSnapshot) {
- onGetSnapshot();
+ onGetSnapshot((GetSnapshot) message);
} else if (message instanceof ServerRemoved) {
onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof ChangeShardMembersVotingStatus) {
persistShardList();
}
- private void onGetSnapshot() {
+ private void onGetSnapshot(final GetSnapshot getSnapshot) {
LOG.debug("{}: onGetSnapshot", persistenceId());
List<String> notInitialized = null;
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
for (ShardInformation shardInfo: localShards.values()) {
- shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+ shardInfo.getActor().tell(getSnapshot, replyActor);
}
}