+ }
+ };
+ }
+
+ @Test
+ public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = factory.generateActorId("leader-");
+ String follower1Id = factory.generateActorId("follower-");
+ String follower2Id = factory.generateActorId("follower-");
+
+ ActorRef followerActor1 =
+ factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
+ ActorRef followerActor2 =
+ factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1Id, followerActor1.path().toString());
+ peerAddresses.put(follower2Id, followerActor2.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(9);
+ leaderActor.getRaftActorContext().setLastApplied(9);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // create 5 entries in the log
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+
+ //set the snapshot index to 4 , 0 to 4 are snapshotted
+ leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+ //setting replicatedToAllIndex = 9, for the log to clear
+ leader.setReplicatedToAllIndex(9);
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // set the 2nd follower nextIndex to 1 which has been snapshotted
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());