Allow incremental recovery 77/87077/13
authorTibor Král <tibor.kral@pantheon.tech>
Wed, 22 Jan 2020 11:53:41 +0000 (12:53 +0100)
committerRobert Varga <nite@hq.sk>
Fri, 12 Jun 2020 04:31:23 +0000 (04:31 +0000)
Expose configuration knob in DatastoreContext to specify the
amount of recovered journal entries after which a Snapshot should
be taken and the journal purged.

JIRA: CONTROLLER-1915
Change-Id: I4b20a0abe0329965ca5ac1ab5df7d9ca8480cfb2
Signed-off-by: Tibor Král <tibor.kral@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java

index 070218e92e46448a74f82fd2416c1458b2f320c5..c5c78130e8fcb22c5c614a03032a74acae40a2bc 100644 (file)
@@ -40,6 +40,15 @@ public interface ConfigParams {
      */
     int getSnapshotDataThresholdPercentage();
 
      */
     int getSnapshotDataThresholdPercentage();
 
+
+    /**
+     * Returns the interval(in seconds) after which a snapshot should be taken during recovery.
+     * Negative value means don't take snapshots.
+     *
+     * @return the interval of recovery snapshot in seconds
+     */
+    int getRecoverySnapshotIntervalSeconds();
+
     /**
      * Returns the interval at which a heart beat message should be sent to remote followers.
      *
     /**
      * Returns the interval at which a heart beat message should be sent to remote followers.
      *
index a7a5c62769e8f36dacda70ef0be6dc7f739dffe8..97838b03212f8ea8b3fbd6201995b398a9b61977 100644 (file)
@@ -29,6 +29,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigParamsImpl.class);
 
     private static final int SNAPSHOT_BATCH_COUNT = 20000;
     private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigParamsImpl.class);
 
     private static final int SNAPSHOT_BATCH_COUNT = 20000;
+    /**
+     * Interval after which a snapshot should be taken during the recovery process. 0 if never.
+     */
+    private static final int RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
 
     private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
 
 
     private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
 
@@ -56,6 +60,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
     private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+    private int recoverySnapshotIntervalSeconds = RECOVERY_SNAPSHOT_INTERVAL_SECONDS;
     private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
     private FiniteDuration electionTimeOutInterval;
 
     private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
     private FiniteDuration electionTimeOutInterval;
 
@@ -86,6 +91,11 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         this.snapshotBatchCount = snapshotBatchCount;
     }
 
         this.snapshotBatchCount = snapshotBatchCount;
     }
 
+    public void setRecoverySnapshotIntervalSeconds(int recoverySnapshotInterval) {
+        checkArgument(recoverySnapshotInterval >= 0);
+        this.recoverySnapshotIntervalSeconds = recoverySnapshotInterval;
+    }
+
     public void setSnapshotDataThresholdPercentage(final int snapshotDataThresholdPercentage) {
         this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
     }
     public void setSnapshotDataThresholdPercentage(final int snapshotDataThresholdPercentage) {
         this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
     }
@@ -138,6 +148,11 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         return snapshotDataThresholdPercentage;
     }
 
         return snapshotDataThresholdPercentage;
     }
 
+    @Override
+    public int getRecoverySnapshotIntervalSeconds() {
+        return this.recoverySnapshotIntervalSeconds;
+    }
+
     @Override
     public FiniteDuration getHeartBeatInterval() {
         return heartBeatInterval;
     @Override
     public FiniteDuration getHeartBeatInterval() {
         return heartBeatInterval;
index 873b8514a2e30038f4327a01507df063b2468211..10375f9406666234bb3f6f69d14cb2ea003ef7f8 100644 (file)
@@ -11,6 +11,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
 import java.util.Collections;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
@@ -39,6 +40,7 @@ class RaftActorRecoverySupport {
     private boolean hasMigratedDataRecovered;
 
     private Stopwatch recoveryTimer;
     private boolean hasMigratedDataRecovered;
 
     private Stopwatch recoveryTimer;
+    private Stopwatch recoverySnapshotTimer;
     private final Logger log;
 
     RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
     private final Logger log;
 
     RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
@@ -100,16 +102,19 @@ class RaftActorRecoverySupport {
         return context.getReplicatedLog();
     }
 
         return context.getReplicatedLog();
     }
 
-    private void initRecoveryTimer() {
+    private void initRecoveryTimers() {
         if (recoveryTimer == null) {
             recoveryTimer = Stopwatch.createStarted();
         }
         if (recoveryTimer == null) {
             recoveryTimer = Stopwatch.createStarted();
         }
+        if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
+            recoverySnapshotTimer = Stopwatch.createStarted();
+        }
     }
 
     private void onRecoveredSnapshot(final SnapshotOffer offer) {
         log.debug("{}: SnapshotOffer called.", context.getId());
 
     }
 
     private void onRecoveredSnapshot(final SnapshotOffer offer) {
         log.debug("{}: SnapshotOffer called.", context.getId());
 
-        initRecoveryTimer();
+        initRecoveryTimers();
 
         Snapshot snapshot = (Snapshot) offer.snapshot();
 
 
         Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -200,6 +205,14 @@ class RaftActorRecoverySupport {
             if (logEntry != null) {
                 lastApplied++;
                 batchRecoveredLogEntry(logEntry);
             if (logEntry != null) {
                 lastApplied++;
                 batchRecoveredLogEntry(logEntry);
+                if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
+                    if (currentRecoveryBatchCount > 0) {
+                        endCurrentLogRecoveryBatch();
+                    }
+                    context.setLastApplied(lastApplied);
+                    context.setCommitIndex(lastApplied);
+                    takeRecoverySnapshot(logEntry);
+                }
             } else {
                 // Shouldn't happen but cover it anyway.
                 log.error("{}: Log entry not found for index {}", context.getId(), i);
             } else {
                 // Shouldn't happen but cover it anyway.
                 log.error("{}: Log entry not found for index {}", context.getId(), i);
@@ -220,7 +233,7 @@ class RaftActorRecoverySupport {
     }
 
     private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
     }
 
     private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
-        initRecoveryTimer();
+        initRecoveryTimers();
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
         if (!isServerConfigurationPayload(logEntry)) {
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
         if (!isServerConfigurationPayload(logEntry)) {
@@ -236,6 +249,23 @@ class RaftActorRecoverySupport {
         }
     }
 
         }
     }
 
+    private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
+        log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
+        final SnapshotManager snapshotManager = context.getSnapshotManager();
+        if (snapshotManager.capture(logEntry, -1)) {
+            log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
+            this.recoverySnapshotTimer.reset().start();
+        } else {
+            log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
+                + "again with the next recovered entry.");
+        }
+    }
+
+    private boolean shouldTakeRecoverySnapshot() {
+        return this.recoverySnapshotTimer != null && this.recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
+            >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
+    }
+
     private void endCurrentLogRecoveryBatch() {
         cohort.applyCurrentLogRecoveryBatch();
         currentRecoveryBatchCount = 0;
     private void endCurrentLogRecoveryBatch() {
         cohort.applyCurrentLogRecoveryBatch();
         currentRecoveryBatchCount = 0;
@@ -254,6 +284,11 @@ class RaftActorRecoverySupport {
             recoveryTime = "";
         }
 
             recoveryTime = "";
         }
 
+        if (recoverySnapshotTimer != null) {
+            recoverySnapshotTimer.stop();
+            recoverySnapshotTimer = null;
+        }
+
         log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
                 + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
                 replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
         log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
                 + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
                 replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
index 4855f42931da658d3d6f7912edb930fd78498bf8..0d29c31ee4b3b1d5f9e879c3a014317fa188e850 100644 (file)
@@ -14,17 +14,32 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatchers;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatchers;
@@ -44,6 +59,7 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,30 +75,31 @@ public class RaftActorRecoverySupportTest {
     @Mock
     private DataPersistenceProvider mockPersistence;
 
     @Mock
     private DataPersistenceProvider mockPersistence;
 
-
     @Mock
     private RaftActorRecoveryCohort mockCohort;
 
     @Mock
     private RaftActorRecoveryCohort mockCohort;
 
-    @Mock
-    private RaftActorSnapshotCohort mockSnapshotCohort;
-
     @Mock
     PersistentDataProvider mockPersistentProvider;
 
     @Mock
     PersistentDataProvider mockPersistentProvider;
 
+    ActorRef mockActorRef;
+
+    ActorSystem mockActorSystem;
+
     private RaftActorRecoverySupport support;
 
     private RaftActorContext context;
     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
     private final String localId = "leader";
 
     private RaftActorRecoverySupport support;
 
     private RaftActorContext context;
     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
     private final String localId = "leader";
 
-
     @Before
     public void setup() {
         MockitoAnnotations.initMocks(this);
     @Before
     public void setup() {
         MockitoAnnotations.initMocks(this);
-
-        context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
-                LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
-                mockPersistence, applyState -> { }, LOG,  MoreExecutors.directExecutor());
+        mockActorSystem = ActorSystem.create();
+        mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class));
+        context = new RaftActorContextImpl(mockActorRef, null, localId,
+                new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1,
+                Collections.<String, String>emptyMap(), configParams, mockPersistence, applyState -> {
+        }, LOG, MoreExecutors.directExecutor());
 
         support = new RaftActorRecoverySupport(context, mockCohort);
 
 
         support = new RaftActorRecoverySupport(context, mockCohort);
 
@@ -159,6 +176,40 @@ public class RaftActorRecoverySupportTest {
         inOrder.verifyNoMoreInteractions();
     }
 
         inOrder.verifyNoMoreInteractions();
     }
 
+    @Test
+    public void testIncrementalRecovery() {
+        int recoverySnapshotInterval = 3;
+        int numberOfEntries = 5;
+        configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
+        Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
+        context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
+
+        ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+
+        for (int i = 0; i <= numberOfEntries; i++) {
+            replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
+                new MockRaftActorContext.MockPayload(String.valueOf(i))));
+        }
+
+        AtomicInteger entryCount = new AtomicInteger();
+        ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
+            int run = entryCount.getAndIncrement();
+            LOG.info("Sending entry number {}", run);
+            sendMessageToSupport(new ApplyJournalEntries(run));
+        }, 0, 1, TimeUnit.SECONDS);
+
+        ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
+            numberOfEntries, TimeUnit.SECONDS);
+        try {
+            canceller.get();
+            verify(mockSnapshotConsumer, times(1)).accept(any());
+            applyEntriesExecutor.shutdown();
+        } catch (InterruptedException | ExecutionException e) {
+            Assert.fail();
+        }
+    }
+
     @Test
     public void testOnSnapshotOffer() {
 
     @Test
     public void testOnSnapshotOffer() {
 
@@ -184,7 +235,7 @@ public class RaftActorRecoverySupportTest {
                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
-        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
 
         sendMessageToSupport(snapshotOffer);
 
 
         sendMessageToSupport(snapshotOffer);
 
@@ -298,8 +349,8 @@ public class RaftActorRecoverySupportTest {
     }
 
     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
     }
 
     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
-        return ArgumentMatchers.argThat(
-            other -> term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
+        return ArgumentMatchers.argThat(other ->
+                term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
     }
 
     @Test
     }
 
     @Test
@@ -380,16 +431,16 @@ public class RaftActorRecoverySupportTest {
         long electionTerm = 2;
         String electionVotedFor = "member-2";
         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
         long electionTerm = 2;
         String electionVotedFor = "member-2";
         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
-                                                        new ServerInfo(localId, true),
-                                                        new ServerInfo("follower1", true),
-                                                        new ServerInfo("follower2", true)));
+                new ServerInfo(localId, true),
+                new ServerInfo("follower1", true),
+                new ServerInfo("follower2", true)));
 
         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
         Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
 
         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
         Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
-        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
 
         sendMessageToSupport(snapshotOffer);
 
 
         sendMessageToSupport(snapshotOffer);
 
@@ -398,6 +449,6 @@ public class RaftActorRecoverySupportTest {
         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
-            Sets.newHashSet(context.getPeerIds()));
+                Sets.newHashSet(context.getPeerIds()));
     }
     }
-}
+}
\ No newline at end of file
index 727aa5ddaed51b3e6da9867fc6fb5ebf4a22f7db..608ea328c5e63f58d04ed11499fc729145bd93c2 100644 (file)
@@ -116,4 +116,7 @@ operational.persistent=false
 # Multiplicator of shard-leader-election-timeout-in-seconds for the purposes of initial datastore
 # convergence. Each frontend datastore instance will wait specified amount of time before becoming
 # exposed as a service. A value of 0 indicates waiting forever. Defaults to 3.
 # Multiplicator of shard-leader-election-timeout-in-seconds for the purposes of initial datastore
 # convergence. Each frontend datastore instance will wait specified amount of time before becoming
 # exposed as a service. A value of 0 indicates waiting forever. Defaults to 3.
-initial-settle-timeout-multiplier=3
+#initial-settle-timeout-multiplier=3
+
+#Interval after which a snapshot should be taken during the recovery process.
+#recovery-snapshot-interval-seconds=0
index 54a14ff9dfafdbcd42353e40f22d41b9a3eecb00..4bd3974557037100e4e2c82a597ad7093fcbd6c6 100644 (file)
@@ -44,6 +44,7 @@ public class DatastoreContext implements ClientActorConfig {
     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
+    public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
@@ -105,6 +106,7 @@ public class DatastoreContext implements ClientActorConfig {
     DatastoreContext() {
         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
     DatastoreContext() {
         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
+        setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
@@ -142,6 +144,7 @@ public class DatastoreContext implements ClientActorConfig {
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
+        setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
@@ -296,6 +299,14 @@ public class DatastoreContext implements ClientActorConfig {
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
+    /**
+     * Set the interval in seconds after which a snapshot should be taken during the recovery process.
+     * 0 means don't take snapshots
+     */
+    private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
+        raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
+    }
+
     @Deprecated
     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
     @Deprecated
     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
@@ -415,6 +426,12 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
             return this;
         }
 
+        public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
+            checkArgument(recoverySnapshotIntervalSeconds >= 0);
+            datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
+            return this;
+        }
+
         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             return this;
         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             return this;
index 7a932b8b1107b599d6a9f5b494e516224e59fbe1..e2e146c7c57756a37888fe89440dacd12c93498a 100644 (file)
@@ -151,6 +151,12 @@ module distributed-datastore-provider {
                          Zero value means wait indefinitely (as long as it takes).";
         }
 
                          Zero value means wait indefinitely (as long as it takes).";
         }
 
+        leaf recovery-snapshot-interval-seconds {
+            default 0;
+            type uint32;
+            description "Interval after which a snapshot should be taken during the recovery process.";
+        }
+
         leaf shard-batched-modification-count {
             default 1000;
             type non-zero-uint32-type;
         leaf shard-batched-modification-count {
             default 1000;
             type non-zero-uint32-type;
index 06e865b91c4fb3719f9bd6550dfef37aad8afc6c..40ceda29ce787a1f331ddc28fd137d1dda53661d 100644 (file)
@@ -61,6 +61,7 @@ public class DatastoreContextIntrospectorTest {
         properties.put("shard-initialization-timeout-in-seconds", "82");
         properties.put("shard-leader-election-timeout-in-seconds", "66");
         properties.put("initial-settle-timeout-multiplier", "5");
         properties.put("shard-initialization-timeout-in-seconds", "82");
         properties.put("shard-leader-election-timeout-in-seconds", "66");
         properties.put("initial-settle-timeout-multiplier", "5");
+        properties.put("recovery-snapshot-interval-seconds", "360");
         properties.put("shard-isolated-leader-check-interval-in-millis", "123");
         properties.put("shard-snapshot-data-threshold-percentage", "100");
         properties.put("shard-election-timeout-factor", "21");
         properties.put("shard-isolated-leader-check-interval-in-millis", "123");
         properties.put("shard-snapshot-data-threshold-percentage", "100");
         properties.put("shard-election-timeout-factor", "21");
@@ -87,6 +88,7 @@ public class DatastoreContextIntrospectorTest {
         assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
         assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
         assertEquals(5, context.getInitialSettleTimeoutMultiplier());
         assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
         assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
         assertEquals(5, context.getInitialSettleTimeoutMultiplier());
+        assertEquals(360, context.getShardRaftConfig().getRecoverySnapshotIntervalSeconds());
         assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
         assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
         assertEquals(21, context.getShardRaftConfig().getElectionTimeoutFactor());
         assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
         assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
         assertEquals(21, context.getShardRaftConfig().getElectionTimeoutFactor());
index 42b1cf79d079fe600901c2989ab41d6614206cf8..6cab9db49dd96155894fe3296add23e0ac1565bf 100644 (file)
@@ -17,6 +17,7 @@ import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEF
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_MS;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_MS;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
@@ -77,6 +78,7 @@ public class DatastoreContextTest {
         builder.shardTransactionCommitTimeoutInSeconds(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS + 1);
         builder.shardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1);
         builder.shardSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT + 1);
         builder.shardTransactionCommitTimeoutInSeconds(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS + 1);
         builder.shardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1);
         builder.shardSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT + 1);
+        builder.recoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS + 1);
         builder.shardHeartbeatIntervalInMillis(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1);
         builder.shardTransactionCommitQueueCapacity(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1);
         builder.shardInitializationTimeout(DEFAULT_SHARD_INITIALIZATION_TIMEOUT
         builder.shardHeartbeatIntervalInMillis(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1);
         builder.shardTransactionCommitQueueCapacity(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1);
         builder.shardInitializationTimeout(DEFAULT_SHARD_INITIALIZATION_TIMEOUT
@@ -126,6 +128,8 @@ public class DatastoreContextTest {
         assertEquals(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1,
                 context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
         assertEquals(DEFAULT_SNAPSHOT_BATCH_COUNT + 1, context.getShardRaftConfig().getSnapshotBatchCount());
         assertEquals(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1,
                 context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
         assertEquals(DEFAULT_SNAPSHOT_BATCH_COUNT + 1, context.getShardRaftConfig().getSnapshotBatchCount());
+        assertEquals(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS + 1,
+                context.getShardRaftConfig().getRecoverySnapshotIntervalSeconds());
         assertEquals(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1,
                 context.getShardRaftConfig().getHeartBeatInterval().length());
         assertEquals(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1, context.getShardTransactionCommitQueueCapacity());
         assertEquals(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1,
                 context.getShardRaftConfig().getHeartBeatInterval().length());
         assertEquals(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1, context.getShardTransactionCommitQueueCapacity());