- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
- String follower1Id = factory.generateActorId("follower-");
- String follower2Id = factory.generateActorId("follower-");
-
- ActorRef followerActor1 =
- factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
- ActorRef followerActor2 =
- factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(follower1Id, followerActor1.path().toString());
- peerAddresses.put(follower2Id, followerActor2.path().toString());
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
-
- MockRaftActor leaderActor = mockActorRef.underlyingActor();
- leaderActor.getRaftActorContext().setCommitIndex(9);
- leaderActor.getRaftActorContext().setLastApplied(9);
- leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
-
- leaderActor.waitForInitializeBehaviorComplete();
-
- Leader leader = new Leader(leaderActor.getRaftActorContext());
- leaderActor.setCurrentBehavior(leader);
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // create 5 entries in the log
- MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
- leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
-
- //set the snapshot index to 4 , 0 to 4 are snapshotted
- leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
- //setting replicatedToAllIndex = 9, for the log to clear
- leader.setReplicatedToAllIndex(9);
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // set the 2nd follower nextIndex to 1 which has been snapshotted
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
- // simulate a real snapshot
- leaderActor.onReceiveCommand(new SendHeartBeat());
- assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
- leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
- , RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
-
- //reply from a slow follower does not initiate a fake snapshot
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
- assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
-
- ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("foo-0"),
- new MockRaftActorContext.MockPayload("foo-1"),
- new MockRaftActorContext.MockPayload("foo-2"),
- new MockRaftActorContext.MockPayload("foo-3"),
- new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
-
- assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
-
- //reply from a slow follower after should not raise errors
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
- assertEquals(0, leaderActor.getReplicatedLog().size());
- }
- };
- }