+
+ @Test
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ followerActor.path().toString(), -1, false));
+
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
+ @Test
+ public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+ int hashCode = installSnapshot.getData().hashCode();
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+