+ assertEquals(0, leader.followerSnapshotSize());
+ assertEquals(1, leader.followerLogSize());
+ assertNotNull(leader.getFollower(followerActor.path().toString()));
+ FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex + 1, fli.getNextIndex());
+ }};
+ }
+
+ @Test
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());