+ @Test
+ public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = factory.generateActorId("leader-");
+ String follower1Id = factory.generateActorId("follower-");
+ String follower2Id = factory.generateActorId("follower-");
+
+ ActorRef followerActor1 =
+ factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
+ ActorRef followerActor2 =
+ factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1Id, followerActor1.path().toString());
+ peerAddresses.put(follower2Id, followerActor2.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(9);
+ leaderActor.getRaftActorContext().setLastApplied(9);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // create 5 entries in the log
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+
+ //set the snapshot index to 4 , 0 to 4 are snapshotted
+ leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+ //setting replicatedToAllIndex = 9, for the log to clear
+ leader.setReplicatedToAllIndex(9);
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // set the 2nd follower nextIndex to 1 which has been snapshotted
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // simulate a real snapshot
+ leaderActor.onReceiveCommand(new SendHeartBeat());
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
+ leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
+ , RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+
+ //reply from a slow follower does not initiate a fake snapshot
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
+ assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
+
+ //reply from a slow follower after should not raise errors
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
+ assertEquals(0, leaderActor.getReplicatedLog().size());