*/
int getSnapshotDataThresholdPercentage();
+
+ /**
+ * Returns the interval(in seconds) after which a snapshot should be taken during recovery.
+ * Negative value means don't take snapshots.
+ *
+ * @return the interval of recovery snapshot in seconds
+ */
+ int getRecoverySnapshotIntervalSeconds();
+
/**
* Returns the interval at which a heart beat message should be sent to remote followers.
*
private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigParamsImpl.class);
private static final int SNAPSHOT_BATCH_COUNT = 20000;
+ /**
+ * Interval after which a snapshot should be taken during the recovery process. 0 if never.
+ */
+ private static final int RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+ private int recoverySnapshotIntervalSeconds = RECOVERY_SNAPSHOT_INTERVAL_SECONDS;
private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
private FiniteDuration electionTimeOutInterval;
this.snapshotBatchCount = snapshotBatchCount;
}
+ public void setRecoverySnapshotIntervalSeconds(int recoverySnapshotInterval) {
+ checkArgument(recoverySnapshotInterval >= 0);
+ this.recoverySnapshotIntervalSeconds = recoverySnapshotInterval;
+ }
+
public void setSnapshotDataThresholdPercentage(final int snapshotDataThresholdPercentage) {
this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
}
return snapshotDataThresholdPercentage;
}
+ @Override
+ public int getRecoverySnapshotIntervalSeconds() {
+ return this.recoverySnapshotIntervalSeconds;
+ }
+
@Override
public FiniteDuration getHeartBeatInterval() {
return heartBeatInterval;
import akka.persistence.SnapshotOffer;
import com.google.common.base.Stopwatch;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
private boolean hasMigratedDataRecovered;
private Stopwatch recoveryTimer;
+ private Stopwatch recoverySnapshotTimer;
private final Logger log;
RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
return context.getReplicatedLog();
}
- private void initRecoveryTimer() {
+ private void initRecoveryTimers() {
if (recoveryTimer == null) {
recoveryTimer = Stopwatch.createStarted();
}
+ if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
+ recoverySnapshotTimer = Stopwatch.createStarted();
+ }
}
private void onRecoveredSnapshot(final SnapshotOffer offer) {
log.debug("{}: SnapshotOffer called.", context.getId());
- initRecoveryTimer();
+ initRecoveryTimers();
Snapshot snapshot = (Snapshot) offer.snapshot();
if (logEntry != null) {
lastApplied++;
batchRecoveredLogEntry(logEntry);
+ if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
+ if (currentRecoveryBatchCount > 0) {
+ endCurrentLogRecoveryBatch();
+ }
+ context.setLastApplied(lastApplied);
+ context.setCommitIndex(lastApplied);
+ takeRecoverySnapshot(logEntry);
+ }
} else {
// Shouldn't happen but cover it anyway.
log.error("{}: Log entry not found for index {}", context.getId(), i);
}
private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
- initRecoveryTimer();
+ initRecoveryTimers();
int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
if (!isServerConfigurationPayload(logEntry)) {
}
}
+ private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
+ log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
+ final SnapshotManager snapshotManager = context.getSnapshotManager();
+ if (snapshotManager.capture(logEntry, -1)) {
+ log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
+ this.recoverySnapshotTimer.reset().start();
+ } else {
+ log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
+ + "again with the next recovered entry.");
+ }
+ }
+
+ private boolean shouldTakeRecoverySnapshot() {
+ return this.recoverySnapshotTimer != null && this.recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
+ >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
+ }
+
private void endCurrentLogRecoveryBatch() {
cohort.applyCurrentLogRecoveryBatch();
currentRecoveryBatchCount = 0;
recoveryTime = "";
}
+ if (recoverySnapshotTimer != null) {
+ recoverySnapshotTimer.stop();
+ recoverySnapshotTimer = null;
+ }
+
log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
+ "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Mock
private DataPersistenceProvider mockPersistence;
-
@Mock
private RaftActorRecoveryCohort mockCohort;
- @Mock
- private RaftActorSnapshotCohort mockSnapshotCohort;
-
@Mock
PersistentDataProvider mockPersistentProvider;
+ ActorRef mockActorRef;
+
+ ActorSystem mockActorSystem;
+
private RaftActorRecoverySupport support;
private RaftActorContext context;
private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
private final String localId = "leader";
-
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
-
- context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
- LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
- mockPersistence, applyState -> { }, LOG, MoreExecutors.directExecutor());
+ mockActorSystem = ActorSystem.create();
+ mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class));
+ context = new RaftActorContextImpl(mockActorRef, null, localId,
+ new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1,
+ Collections.<String, String>emptyMap(), configParams, mockPersistence, applyState -> {
+ }, LOG, MoreExecutors.directExecutor());
support = new RaftActorRecoverySupport(context, mockCohort);
inOrder.verifyNoMoreInteractions();
}
+ @Test
+ public void testIncrementalRecovery() {
+ int recoverySnapshotInterval = 3;
+ int numberOfEntries = 5;
+ configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
+ Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
+ context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
+
+ ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
+ ReplicatedLog replicatedLog = context.getReplicatedLog();
+
+ for (int i = 0; i <= numberOfEntries; i++) {
+ replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
+ new MockRaftActorContext.MockPayload(String.valueOf(i))));
+ }
+
+ AtomicInteger entryCount = new AtomicInteger();
+ ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
+ int run = entryCount.getAndIncrement();
+ LOG.info("Sending entry number {}", run);
+ sendMessageToSupport(new ApplyJournalEntries(run));
+ }, 0, 1, TimeUnit.SECONDS);
+
+ ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
+ numberOfEntries, TimeUnit.SECONDS);
+ try {
+ canceller.get();
+ verify(mockSnapshotConsumer, times(1)).accept(any());
+ applyEntriesExecutor.shutdown();
+ } catch (InterruptedException | ExecutionException e) {
+ Assert.fail();
+ }
+ }
+
@Test
public void testOnSnapshotOffer() {
lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
- SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+ SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
sendMessageToSupport(snapshotOffer);
}
static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
- return ArgumentMatchers.argThat(
- other -> term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
+ return ArgumentMatchers.argThat(other ->
+ term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
}
@Test
long electionTerm = 2;
String electionVotedFor = "member-2";
ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
- new ServerInfo(localId, true),
- new ServerInfo("follower1", true),
- new ServerInfo("follower2", true)));
+ new ServerInfo(localId, true),
+ new ServerInfo("follower1", true),
+ new ServerInfo("follower2", true)));
MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
-1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
- SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+ SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
sendMessageToSupport(snapshotOffer);
assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
- Sets.newHashSet(context.getPeerIds()));
+ Sets.newHashSet(context.getPeerIds()));
}
-}
+}
\ No newline at end of file
# Multiplicator of shard-leader-election-timeout-in-seconds for the purposes of initial datastore
# convergence. Each frontend datastore instance will wait specified amount of time before becoming
# exposed as a service. A value of 0 indicates waiting forever. Defaults to 3.
-initial-settle-timeout-multiplier=3
+#initial-settle-timeout-multiplier=3
+
+#Interval after which a snapshot should be taken during the recovery process.
+#recovery-snapshot-interval-seconds=0
public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
+ public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
DatastoreContext() {
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
+ setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
+ setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
+ /**
+ * Set the interval in seconds after which a snapshot should be taken during the recovery process.
+ * 0 means don't take snapshots
+ */
+ private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
+ raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
+ }
+
@Deprecated
private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
// We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
return this;
}
+ public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
+ checkArgument(recoverySnapshotIntervalSeconds >= 0);
+ datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
+ return this;
+ }
+
public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
return this;
Zero value means wait indefinitely (as long as it takes).";
}
+ leaf recovery-snapshot-interval-seconds {
+ default 0;
+ type uint32;
+ description "Interval after which a snapshot should be taken during the recovery process.";
+ }
+
leaf shard-batched-modification-count {
default 1000;
type non-zero-uint32-type;
properties.put("shard-initialization-timeout-in-seconds", "82");
properties.put("shard-leader-election-timeout-in-seconds", "66");
properties.put("initial-settle-timeout-multiplier", "5");
+ properties.put("recovery-snapshot-interval-seconds", "360");
properties.put("shard-isolated-leader-check-interval-in-millis", "123");
properties.put("shard-snapshot-data-threshold-percentage", "100");
properties.put("shard-election-timeout-factor", "21");
assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
assertEquals(5, context.getInitialSettleTimeoutMultiplier());
+ assertEquals(360, context.getShardRaftConfig().getRecoverySnapshotIntervalSeconds());
assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(21, context.getShardRaftConfig().getElectionTimeoutFactor());
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_MS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
builder.shardTransactionCommitTimeoutInSeconds(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS + 1);
builder.shardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1);
builder.shardSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT + 1);
+ builder.recoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS + 1);
builder.shardHeartbeatIntervalInMillis(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1);
builder.shardTransactionCommitQueueCapacity(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1);
builder.shardInitializationTimeout(DEFAULT_SHARD_INITIALIZATION_TIMEOUT
assertEquals(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1,
context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
assertEquals(DEFAULT_SNAPSHOT_BATCH_COUNT + 1, context.getShardRaftConfig().getSnapshotBatchCount());
+ assertEquals(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS + 1,
+ context.getShardRaftConfig().getRecoverySnapshotIntervalSeconds());
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1,
context.getShardRaftConfig().getHeartBeatInterval().length());
assertEquals(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1, context.getShardTransactionCommitQueueCapacity());