+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext(leaderActor);
+ actorContext.setPeerAddresses(peerAddresses);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(followersLastIndex);
+ //set follower timeout to 2 mins, helps during debugging
+ actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ //update follower timestamp
+ leader.markFollowerActive(followerActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+ leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+
+ //send first chunk and no InstallSnapshotReply received yet
+ leader.getFollowerToSnapshot().getNextChunk();
+ leader.getFollowerToSnapshot().incrementChunkIndex();
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
+ followerActor, AppendEntries.SERIALIZABLE_CLASS);
+
+ assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
+ "received", aeproto);
+
+ AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+
+ assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
+
+ //InstallSnapshotReply received
+ leader.getFollowerToSnapshot().markSendStatus(true);
+
+ leader.handleMessage(senderActor, new SendHeartBeat());
+
+ InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
+ MessageCollectorActor.getFirstMatching(followerActor,
+ InstallSnapshot.SERIALIZABLE_CLASS);
+
+ assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
+ isproto);
+
+ InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+
+ assertEquals(snapshotIndex, is.getLastIncludedIndex());
+
+ }};
+ }
+
+ @Test
+ public void testSendAppendEntriesSnapshotScenario() {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext(getRef());
+ actorContext.setPeerAddresses(peerAddresses);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ Leader leader = new Leader(actorContext);
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ //update follower timestamp
+ leader.markFollowerActive(followerActor.path().toString());
+
+ // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ RaftActorBehavior raftBehavior = leader.handleMessage(
+ senderActor, new Replicate(null, "state-id", entry));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
+ Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+ @Override
+ protected Boolean match(Object o) throws Exception {
+ if (o instanceof InitiateInstallSnapshot) {
+ return true;
+ }
+ return false;
+ }
+ }.get();
+
+ boolean initiateInitiateInstallSnapshot = false;
+ for (Boolean b: matches) {
+ initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
+ }
+
+ assertTrue(initiateInitiateInstallSnapshot);
+ }};
+ }
+
+ @Test
+ public void testInitiateInstallSnapshot() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext(leaderActor);
+ actorContext.setPeerAddresses(peerAddresses);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setLastApplied(3);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ Leader leader = new Leader(actorContext);
+ // set the snapshot as absent and check if capture-snapshot is invoked.
+ leader.setSnapshot(Optional.<ByteString>absent());
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ actorContext.getReplicatedLog().append(entry);
+
+ // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ RaftActorBehavior raftBehavior = leader.handleMessage(
+ leaderActor, new InitiateInstallSnapshot());
+
+ CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+ getFirstMatching(leaderActor, CaptureSnapshot.class);
+
+ assertNotNull(cs);
+
+ assertTrue(cs.isInstallSnapshotInitiated());
+ assertEquals(3, cs.getLastAppliedIndex());
+ assertEquals(1, cs.getLastAppliedTerm());
+ assertEquals(4, cs.getLastIndex());
+ assertEquals(2, cs.getLastTerm());
+ }};
+ }
+
+ @Test
+ public void testInstallSnapshot() {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ actorContext.setPeerAddresses(peerAddresses);
+
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ actorContext.setCommitIndex(followersLastIndex);
+
+ Leader leader = new Leader(actorContext);
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+ new SendInstallSnapshot(toByteString(leadersSnapshot)));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ // check if installsnapshot gets called with the correct values.
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+ InstallSnapshot is = (InstallSnapshot)
+ SerializationUtils.fromSerializable(in);
+ if (is.getData() == null) {
+ return "InstallSnapshot data is null";
+ }
+ if (is.getLastIncludedIndex() != snapshotIndex) {
+ return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+ }
+ if (is.getLastIncludedTerm() != snapshotTerm) {
+ return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+ }
+ if (is.getTerm() == currentTerm) {
+ return is.getTerm() + "!=" + currentTerm;
+ }
+
+ return "match";
+
+ } else {
+ return "message mismatch:" + in.getClass();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+ }};
+ }
+
+ @Test
+ public void testHandleInstallSnapshotReplyLastChunk() {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+ leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+ while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+ leader.getFollowerToSnapshot().getNextChunk();
+ leader.getFollowerToSnapshot().incrementChunkIndex();
+ }
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+ new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+ leader.getFollowerToSnapshot().getChunkIndex(), true));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+ assertEquals(leader.followerToLog.size(), 1);
+ assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+ FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+ assertEquals(snapshotIndex, fli.getMatchIndex().get());
+ assertEquals(snapshotIndex, fli.getMatchIndex().get());
+ assertEquals(snapshotIndex + 1, fli.getNextIndex().get());