Bug 5419: Persist log entries asycnhronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
index 7477f14168abf82353643ef90c6236e2b4bed895..da0493fe225137469a524e004204de257ff2aed3 100644 (file)
@@ -397,7 +397,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
 
-        verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
+        verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Procedure.class));
     }
 
     @Test
@@ -1283,4 +1283,49 @@ public class RaftActorTest extends AbstractActorTest {
 
         TEST_LOG.info("testLeaderTransitioning ending");
     }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Test
+    public void testReplicateWithPersistencePending() throws Exception {
+        final String leaderId = factory.generateActorId("leader-");
+        final String followerId = factory.generateActorId("follower-");
+
+        final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+        DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class);
+        doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable();
+
+        TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+                MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config,
+                        mockPersistenceProvider), leaderId);
+        MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+        leaderActor.waitForInitializeBehaviorComplete();
+
+        leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+        Leader leader = new Leader(leaderActor.getRaftActorContext());
+        leaderActor.setCurrentBehavior(leader);
+
+        leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"));
+
+        ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
+        assertNotNull("ReplicatedLogEntry not found", logEntry);
+        assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
+        assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+        leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
+        assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+        ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture());
+
+        callbackCaptor.getValue().apply(logEntry);
+
+        assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
+        assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
+    }
 }