+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testReplicateWithPersistencePending() throws Exception {
+ final String leaderId = factory.generateActorId("leader-");
+ final String followerId = factory.generateActorId("follower-");
+
+ final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class);
+ doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable();
+
+ TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+ MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config,
+ mockPersistenceProvider), leaderId);
+ MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"),
+ false);
+
+ ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
+ assertNotNull("ReplicatedLogEntry not found", logEntry);
+ assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
+ assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+ leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
+ assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+ ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
+ verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture());
+
+ callbackCaptor.getValue().apply(logEntry);
+
+ assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
+ assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
+ }
+
+ @Test
+ public void testReplicateWithBatchHint() throws Exception {
+ final String leaderId = factory.generateActorId("leader-");
+ final String followerId = factory.generateActorId("follower-");
+
+ final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+ MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config),
+ leaderId);
+ MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true);
+ MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true);
+ MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
+ }