+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testReplicateWithPersistencePending() throws Exception {
+ final String leaderId = factory.generateActorId("leader-");
+ final String followerId = factory.generateActorId("follower-");
+
+ final ActorRef followerActor = factory.createActor(MessageCollectorActor.props());
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class);
+ doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable();
+
+ TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+ MockRaftActor.props(leaderId, Map.of(followerId, followerActor.path().toString()), config,
+ mockPersistenceProvider), leaderId);
+ MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"),
+ false);
+
+ ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
+ assertNotNull("ReplicatedLogEntry not found", logEntry);
+ assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
+ assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+ leaderActor.handleCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
+ assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+ ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
+ verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture());
+
+ callbackCaptor.getValue().apply(logEntry);
+
+ assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
+ assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
+ }
+
+ @Test
+ public void testReplicateWithBatchHint() throws Exception {
+ final String leaderId = factory.generateActorId("leader-");
+ final String followerId = factory.generateActorId("follower-");
+
+ final ActorRef followerActor = factory.createActor(MessageCollectorActor.props());
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+ MockRaftActor.props(leaderId, Map.of(followerId, followerActor.path().toString()), config),
+ leaderId);
+ MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ leaderActor.handleCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true);
+ MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true);
+ MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
+ }
+
+ @Test
+ @SuppressWarnings("checkstyle:illegalcatch")
+ public void testApplyStateRace() throws Exception {
+ final String leaderId = factory.generateActorId("leader-");
+ final String followerId = factory.generateActorId("follower-");
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+ ActorRef mockFollowerActorRef = factory.createActor(MessageCollectorActor.props());
+
+ TestRaftActor.Builder builder = TestRaftActor.newBuilder()
+ .id(leaderId)
+ .peerAddresses(Map.of(followerId, mockFollowerActorRef.path().toString()))
+ .config(config)
+ .collectorActor(factory.createActor(
+ MessageCollectorActor.props(), factory.generateActorId(leaderId + "-collector")));
+
+ TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+ builder.props(), leaderId);
+ MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ leaderActor.setPersistence(new PersistentDataProvider(leaderActor) {
+ @Override
+ public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+ // needs to be executed from another thread to simulate the persistence actor calling this callback
+ executorService.submit(() -> {
+ try {
+ procedure.apply(entry);
+ } catch (Exception e) {
+ TEST_LOG.info("Fail during async persist callback", e);
+ }
+ }, "persistence-callback");