Allow incremental recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
index 4855f42931da658d3d6f7912edb930fd78498bf8..0d29c31ee4b3b1d5f9e879c3a014317fa188e850 100644 (file)
@@ -14,17 +14,32 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatchers;
@@ -44,6 +59,7 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,30 +75,31 @@ public class RaftActorRecoverySupportTest {
     @Mock
     private DataPersistenceProvider mockPersistence;
 
-
     @Mock
     private RaftActorRecoveryCohort mockCohort;
 
-    @Mock
-    private RaftActorSnapshotCohort mockSnapshotCohort;
-
     @Mock
     PersistentDataProvider mockPersistentProvider;
 
+    ActorRef mockActorRef;
+
+    ActorSystem mockActorSystem;
+
     private RaftActorRecoverySupport support;
 
     private RaftActorContext context;
     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
     private final String localId = "leader";
 
-
     @Before
     public void setup() {
         MockitoAnnotations.initMocks(this);
-
-        context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
-                LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
-                mockPersistence, applyState -> { }, LOG,  MoreExecutors.directExecutor());
+        mockActorSystem = ActorSystem.create();
+        mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class));
+        context = new RaftActorContextImpl(mockActorRef, null, localId,
+                new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1,
+                Collections.<String, String>emptyMap(), configParams, mockPersistence, applyState -> {
+        }, LOG, MoreExecutors.directExecutor());
 
         support = new RaftActorRecoverySupport(context, mockCohort);
 
@@ -159,6 +176,40 @@ public class RaftActorRecoverySupportTest {
         inOrder.verifyNoMoreInteractions();
     }
 
+    @Test
+    public void testIncrementalRecovery() {
+        int recoverySnapshotInterval = 3;
+        int numberOfEntries = 5;
+        configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
+        Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
+        context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
+
+        ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+
+        for (int i = 0; i <= numberOfEntries; i++) {
+            replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
+                new MockRaftActorContext.MockPayload(String.valueOf(i))));
+        }
+
+        AtomicInteger entryCount = new AtomicInteger();
+        ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
+            int run = entryCount.getAndIncrement();
+            LOG.info("Sending entry number {}", run);
+            sendMessageToSupport(new ApplyJournalEntries(run));
+        }, 0, 1, TimeUnit.SECONDS);
+
+        ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
+            numberOfEntries, TimeUnit.SECONDS);
+        try {
+            canceller.get();
+            verify(mockSnapshotConsumer, times(1)).accept(any());
+            applyEntriesExecutor.shutdown();
+        } catch (InterruptedException | ExecutionException e) {
+            Assert.fail();
+        }
+    }
+
     @Test
     public void testOnSnapshotOffer() {
 
@@ -184,7 +235,7 @@ public class RaftActorRecoverySupportTest {
                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
-        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
 
         sendMessageToSupport(snapshotOffer);
 
@@ -298,8 +349,8 @@ public class RaftActorRecoverySupportTest {
     }
 
     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
-        return ArgumentMatchers.argThat(
-            other -> term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
+        return ArgumentMatchers.argThat(other ->
+                term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
     }
 
     @Test
@@ -380,16 +431,16 @@ public class RaftActorRecoverySupportTest {
         long electionTerm = 2;
         String electionVotedFor = "member-2";
         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
-                                                        new ServerInfo(localId, true),
-                                                        new ServerInfo("follower1", true),
-                                                        new ServerInfo("follower2", true)));
+                new ServerInfo(localId, true),
+                new ServerInfo("follower1", true),
+                new ServerInfo("follower2", true)));
 
         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
         Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
-        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
 
         sendMessageToSupport(snapshotOffer);
 
@@ -398,6 +449,6 @@ public class RaftActorRecoverySupportTest {
         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
-            Sets.newHashSet(context.getPeerIds()));
+                Sets.newHashSet(context.getPeerIds()));
     }
-}
+}
\ No newline at end of file