+ };
+ }};
+ }
+
+ @Test
+ public void testHandleReplicateMessageSendAppendEntriesToFollower() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef followerActor = getTestActor();
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap();
+
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ actorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(actorContext);
+ RaftActorBehavior raftBehavior = leader
+ .handleMessage(senderActor, new Replicate(null, null,
+ new MockRaftActorContext.MockReplicatedLogEntry(1,
+ 100,
+ new MockRaftActorContext.MockPayload("foo"))
+ ));
+
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
+
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ Object msg = fromSerializableMessage(in);
+ if (msg instanceof AppendEntries) {
+ if (((AppendEntries)msg).getTerm() == 0) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleReplicateMessageWhenThereAreNoFollowers() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef raftActor = getTestActor();
+
+ MockRaftActorContext actorContext =
+ new MockRaftActorContext("test", getSystem(), raftActor);
+
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ actorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
+ .build());
+
+ Leader leader = new Leader(actorContext);
+ RaftActorBehavior raftBehavior = leader
+ .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
+
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
+
+ assertEquals(1, actorContext.getCommitIndex());
+
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"),
+ "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof ApplyState) {
+ if (((ApplyState) in).getIdentifier().equals("state-id")) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testSendInstallSnapshot() {
+ new LeaderTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext(getRef());
+ actorContext.setPeerAddresses(peerAddresses);
+
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshot(
+ toByteString(leadersSnapshot));
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+ MockLeader leader = new MockLeader(actorContext);
+ // set the follower info in leader
+ leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ RaftActorBehavior raftBehavior = leader.handleMessage(
+ senderActor, new Replicate(null, "state-id", entry));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
+ Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+ @Override
+ protected Boolean match(Object o) throws Exception {
+ if (o instanceof SendInstallSnapshot) {
+ return true;
+ }
+ return false;
+ }
+ }.get();
+
+ boolean sendInstallSnapshotReceived = false;
+ for (Boolean b: matches) {
+ sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
+ }
+
+ assertTrue(sendInstallSnapshotReceived);
+
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testInstallSnapshot() {
+ new LeaderTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ actorContext.setPeerAddresses(peerAddresses);
+
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ MockLeader leader = new MockLeader(actorContext);
+ // set the follower info in leader
+ leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ // check if installsnapshot gets called with the correct values.
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+ InstallSnapshot is = (InstallSnapshot)
+ SerializationUtils.fromSerializable(in);
+ if (is.getData() == null) {
+ return "InstallSnapshot data is null";
+ }
+ if (is.getLastIncludedIndex() != snapshotIndex) {
+ return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+ }
+ if (is.getLastIncludedTerm() != snapshotTerm) {
+ return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+ }
+ if (is.getTerm() == currentTerm) {
+ return is.getTerm() + "!=" + currentTerm;
+ }
+
+ return "match";
+
+ } else {
+ return "message mismatch:" + in.getClass();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+ }
+ };
+ }};
+ }