X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=168eb3e5f22c9752dcbe089fe2e87393713d2650;hp=19af64790ff7896f496f6585aa27939d76fa4f65;hb=d480609098769004b964043d7ee4c7458de19fbc;hpb=b3e553ce5b3d3e972cbe19465ab7af2fcb39934c diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 19af64790f..168eb3e5f2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,21 +1,23 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.actor.Props; import akka.testkit.JavaTestKit; +import com.google.common.base.Optional; import com.google.protobuf.ByteString; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -23,17 +25,20 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; +import scala.concurrent.duration.FiniteDuration; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -54,8 +59,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { // handle message should return the Leader state when it receives an // unknown message - RaftState state = leader.handleMessage(senderActor, "foo"); - Assert.assertEquals(RaftState.Leader, state); + RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo"); + Assert.assertTrue(behavior instanceof Leader); }}; } @@ -69,10 +74,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { ActorRef followerActor = getTestActor(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); + MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); - Map peerAddresses = new HashMap(); + Map peerAddresses = new HashMap<>(); peerAddresses.put(followerActor.path().toString(), followerActor.path().toString()); @@ -117,7 +121,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); - Map peerAddresses = new HashMap(); + Map peerAddresses = new HashMap<>(); peerAddresses.put(followerActor.path().toString(), followerActor.path().toString()); @@ -125,7 +129,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { actorContext.setPeerAddresses(peerAddresses); Leader leader = new Leader(actorContext); - RaftState raftState = leader + RaftActorBehavior raftBehavior = leader .handleMessage(senderActor, new Replicate(null, null, new MockRaftActorContext.MockReplicatedLogEntry(1, 100, @@ -133,7 +137,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { )); // State should not change - assertEquals(RaftState.Leader, raftState); + assertTrue(raftBehavior instanceof Leader); final String out = new ExpectMsg(duration("1 seconds"), "match hint") { @@ -152,10 +156,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { }.get(); // this extracts the received message assertEquals("match", out); - } - - }; }}; } @@ -179,11 +180,11 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { .build()); Leader leader = new Leader(actorContext); - RaftState raftState = leader + RaftActorBehavior raftBehavior = leader .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1))); // State should not change - assertEquals(RaftState.Leader, raftState); + assertTrue(raftBehavior instanceof Leader); assertEquals(1, actorContext.getCommitIndex()); @@ -211,229 +212,360 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } @Test - public void testSendInstallSnapshot() { - new LeaderTestKit(getSystem()) {{ + public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); - new Within(duration("1 seconds")) { - protected void run() { - ActorRef followerActor = getTestActor(); + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); - Map peerAddresses = new HashMap(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(leaderActor); + actorContext.setPeerAddresses(peerAddresses); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(getRef()); - actorContext.setPeerAddresses(peerAddresses); + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setCommitIndex(followersLastIndex); + //set follower timeout to 2 mins, helps during debugging + actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10)); - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + MockLeader leader = new MockLeader(actorContext); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; - - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshot( - toByteString(leadersSnapshot)); - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - - MockLeader leader = new MockLeader(actorContext); - // set the follower info in leader - leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); - - // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex - RaftState raftState = leader.handleMessage( - senderActor, new Replicate(null, "state-id", entry)); - - assertEquals(RaftState.Leader, raftState); - - // we might receive some heartbeat messages, so wait till we SendInstallSnapshot - Boolean[] matches = new ReceiveWhile(Boolean.class, duration("2 seconds")) { - @Override - protected Boolean match(Object o) throws Exception { - if (o instanceof SendInstallSnapshot) { - return true; - } - return false; - } - }.get(); + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); - boolean sendInstallSnapshotReceived = false; - for (Boolean b: matches) { - sendInstallSnapshotReceived = b | sendInstallSnapshotReceived; - } + //update follower timestamp + leader.markFollowerActive(followerActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + leader.createFollowerToSnapshot(followerActor.path().toString(), bs); + + //send first chunk and no InstallSnapshotReply received yet + leader.getFollowerToSnapshot().getNextChunk(); + leader.getFollowerToSnapshot().incrementChunkIndex(); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching( + followerActor, AppendEntries.SERIALIZABLE_CLASS); + + assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " + + "received", aeproto); + + AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto); + + assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty()); + + //InstallSnapshotReply received + leader.getFollowerToSnapshot().markSendStatus(true); + + leader.handleMessage(senderActor, new SendHeartBeat()); + + InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot) + MessageCollectorActor.getFirstMatching(followerActor, + InstallSnapshot.SERIALIZABLE_CLASS); + + assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot", + isproto); + + InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto); + + assertEquals(snapshotIndex, is.getLastIncludedIndex()); + + }}; + } + + @Test + public void testSendAppendEntriesSnapshotScenario() { + new JavaTestKit(getSystem()) {{ + + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(getRef()); + actorContext.setPeerAddresses(peerAddresses); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setCommitIndex(followersLastIndex); + + Leader leader = new Leader(actorContext); - assertTrue(sendInstallSnapshotReceived); + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + //update follower timestamp + leader.markFollowerActive(followerActor.path().toString()); + + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex + RaftActorBehavior raftBehavior = leader.handleMessage( + senderActor, new Replicate(null, "state-id", entry)); + + assertTrue(raftBehavior instanceof Leader); + + // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot + Boolean[] matches = new ReceiveWhile(Boolean.class, duration("2 seconds")) { + @Override + protected Boolean match(Object o) throws Exception { + if (o instanceof InitiateInstallSnapshot) { + return true; + } + return false; } - }; + }.get(); + + boolean initiateInitiateInstallSnapshot = false; + for (Boolean b: matches) { + initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot; + } + + assertTrue(initiateInitiateInstallSnapshot); }}; } @Test - public void testInstallSnapshot() { - new LeaderTestKit(getSystem()) {{ + public void testInitiateInstallSnapshot() throws Exception { + new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { - protected void run() { - ActorRef followerActor = getTestActor(); + ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); - Map peerAddresses = new HashMap(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + ActorRef followerActor = getTestActor(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); - actorContext.setPeerAddresses(peerAddresses); + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(leaderActor); + actorContext.setPeerAddresses(peerAddresses); - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot)); - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setLastApplied(3); + actorContext.setCommitIndex(followersLastIndex); - MockLeader leader = new MockLeader(actorContext); - // set the follower info in leader - leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); + Leader leader = new Leader(actorContext); + // set the snapshot as absent and check if capture-snapshot is invoked. + leader.setSnapshot(Optional.absent()); - // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); - RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot()); + actorContext.getReplicatedLog().append(entry); - assertEquals(RaftState.Leader, raftState); + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex + RaftActorBehavior raftBehavior = leader.handleMessage( + leaderActor, new InitiateInstallSnapshot()); - // check if installsnapshot gets called with the correct values. - final String out = - new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof InstallSnapshotMessages.InstallSnapshot) { - InstallSnapshot is = (InstallSnapshot) - SerializationUtils.fromSerializable(in); - if (is.getData() == null) { - return "InstallSnapshot data is null"; - } - if (is.getLastIncludedIndex() != snapshotIndex) { - return is.getLastIncludedIndex() + "!=" + snapshotIndex; - } - if (is.getLastIncludedTerm() != snapshotTerm) { - return is.getLastIncludedTerm() + "!=" + snapshotTerm; - } - if (is.getTerm() == currentTerm) { - return is.getTerm() + "!=" + currentTerm; - } + CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor. + getFirstMatching(leaderActor, CaptureSnapshot.class); - return "match"; + assertNotNull(cs); - } else { - return "message mismatch:" + in.getClass(); - } + assertTrue(cs.isInstallSnapshotInitiated()); + assertEquals(3, cs.getLastAppliedIndex()); + assertEquals(1, cs.getLastAppliedTerm()); + assertEquals(4, cs.getLastIndex()); + assertEquals(2, cs.getLastTerm()); + }}; + } + + @Test + public void testInstallSnapshot() { + new JavaTestKit(getSystem()) {{ + + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + actorContext.setPeerAddresses(peerAddresses); + + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + actorContext.setCommitIndex(followersLastIndex); + + Leader leader = new Leader(actorContext); + + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, + new SendInstallSnapshot(toByteString(leadersSnapshot))); + + assertTrue(raftBehavior instanceof Leader); + + // check if installsnapshot gets called with the correct values. + final String out = + new ExpectMsg(duration("1 seconds"), "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof InstallSnapshotMessages.InstallSnapshot) { + InstallSnapshot is = (InstallSnapshot) + SerializationUtils.fromSerializable(in); + if (is.getData() == null) { + return "InstallSnapshot data is null"; + } + if (is.getLastIncludedIndex() != snapshotIndex) { + return is.getLastIncludedIndex() + "!=" + snapshotIndex; + } + if (is.getLastIncludedTerm() != snapshotTerm) { + return is.getLastIncludedTerm() + "!=" + snapshotTerm; + } + if (is.getTerm() == currentTerm) { + return is.getTerm() + "!=" + currentTerm; } - }.get(); // this extracts the received message - assertEquals("match", out); - } - }; + return "match"; + + } else { + return "message mismatch:" + in.getClass(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); }}; } @Test public void testHandleInstallSnapshotReplyLastChunk() { - new LeaderTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { - protected void run() { - ActorRef followerActor = getTestActor(); + new JavaTestKit(getSystem()) {{ - Map peerAddresses = new HashMap(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + ActorRef followerActor = getTestActor(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); - actorContext.setPeerAddresses(peerAddresses); + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; - - MockLeader leader = new MockLeader(actorContext); - // set the follower info in leader - leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); - - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshot( - toByteString(leadersSnapshot)); - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - - ByteString bs = toByteString(leadersSnapshot); - leader.createFollowerToSnapshot(followerActor.path().toString(), bs); - while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) { - leader.getFollowerToSnapshot().getNextChunk(); - leader.getFollowerToSnapshot().incrementChunkIndex(); - } + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + actorContext.setPeerAddresses(peerAddresses); + actorContext.setCommitIndex(followersLastIndex); - RaftState raftState = leader.handleMessage(senderActor, - new InstallSnapshotReply(currentTerm, followerActor.path().toString(), - leader.getFollowerToSnapshot().getChunkIndex(), true)); + MockLeader leader = new MockLeader(actorContext); - assertEquals(RaftState.Leader, raftState); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - assertEquals(leader.mapFollowerToSnapshot.size(), 0); - assertEquals(leader.followerToLog.size(), 1); - assertNotNull(leader.followerToLog.get(followerActor.path().toString())); - FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString()); - assertEquals(snapshotIndex, fli.getMatchIndex().get()); - assertEquals(snapshotIndex, fli.getMatchIndex().get()); - assertEquals(snapshotIndex + 1, fli.getNextIndex().get()); - } - }; + // set the snapshot variables in replicatedlog + + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + leader.createFollowerToSnapshot(followerActor.path().toString(), bs); + while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) { + leader.getFollowerToSnapshot().getNextChunk(); + leader.getFollowerToSnapshot().incrementChunkIndex(); + } + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, + new InstallSnapshotReply(currentTerm, followerActor.path().toString(), + leader.getFollowerToSnapshot().getChunkIndex(), true)); + + assertTrue(raftBehavior instanceof Leader); + + assertEquals(leader.mapFollowerToSnapshot.size(), 0); + assertEquals(leader.followerToLog.size(), 1); + assertNotNull(leader.followerToLog.get(followerActor.path().toString())); + FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString()); + assertEquals(snapshotIndex, fli.getMatchIndex().get()); + assertEquals(snapshotIndex, fli.getMatchIndex().get()); + assertEquals(snapshotIndex + 1, fli.getNextIndex().get()); }}; } @@ -558,7 +690,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { ForwardMessageToBehaviorActor.setBehavior(follower); - Map peerAddresses = new HashMap(); + Map peerAddresses = new HashMap<>(); peerAddresses.put(followerActor.path().toString(), followerActor.path().toString()); @@ -581,6 +713,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { followerActorContext.setCommitIndex(1); Leader leader = new Leader(leaderActorContext); + leader.markFollowerActive(followerActor.path().toString()); leader.handleMessage(leaderActor, new SendHeartBeat()); @@ -627,7 +760,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { ForwardMessageToBehaviorActor.setBehavior(follower); - Map peerAddresses = new HashMap(); + Map peerAddresses = new HashMap<>(); peerAddresses.put(followerActor.path().toString(), followerActor.path().toString()); @@ -649,6 +782,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { followerActorContext.setCommitIndex(2); Leader leader = new Leader(leaderActorContext); + leader.markFollowerActive(followerActor.path().toString()); leader.handleMessage(leaderActor, new SendHeartBeat()); @@ -674,28 +808,143 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { }}; } - private static class LeaderTestKit extends JavaTestKit { + @Test + public void testHandleAppendEntriesReplyFailure(){ + new JavaTestKit(getSystem()) { + { - private LeaderTestKit(ActorSystem actorSystem) { - super(actorSystem); - } + ActorRef leaderActor = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); - protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){ - // Wait for a specific log message to show up - final boolean result = - new JavaTestKit.EventFilter(logLevel - ) { - @Override - protected Boolean run() { - return true; - } - }.from(subject.path().toString()) - .message(logMessage) - .occurrences(1).exec(); + ActorRef followerActor = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); - Assert.assertEquals(true, result); - } + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", + followerActor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + + Leader leader = new Leader(leaderActorContext); + + AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1); + + RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); + + assertEquals(RaftState.Leader, raftActorBehavior.state()); + + }}; + } + + @Test + public void testHandleAppendEntriesReplySuccess() throws Exception { + new JavaTestKit(getSystem()) { + { + + ActorRef leaderActor = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + ActorRef followerActor = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", + followerActor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setCommitIndex(1); + leaderActorContext.setLastApplied(1); + leaderActorContext.getTermInformation().update(1, "leader"); + + Leader leader = new Leader(leaderActorContext); + + AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1); + + RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); + + assertEquals(RaftState.Leader, raftActorBehavior.state()); + + assertEquals(2, leaderActorContext.getCommitIndex()); + + ApplyLogEntries applyLogEntries = + (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor, + ApplyLogEntries.class); + + assertNotNull(applyLogEntries); + + assertEquals(2, leaderActorContext.getLastApplied()); + + assertEquals(2, applyLogEntries.getToIndex()); + + List applyStateList = MessageCollectorActor.getAllMatching(leaderActor, + ApplyState.class); + + assertEquals(1,applyStateList.size()); + + ApplyState applyState = (ApplyState) applyStateList.get(0); + + assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); + + }}; + } + + @Test + public void testHandleAppendEntriesReplyUnknownFollower(){ + new JavaTestKit(getSystem()) { + { + + ActorRef leaderActor = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + Leader leader = new Leader(leaderActorContext); + + AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1); + + RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply); + + assertEquals(RaftState.Leader, raftActorBehavior.state()); + + }}; + } + + @Test + public void testHandleRequestVoteReply(){ + new JavaTestKit(getSystem()) { + { + + ActorRef leaderActor = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + Leader leader = new Leader(leaderActorContext); + + RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true)); + + assertEquals(RaftState.Leader, raftActorBehavior.state()); + + raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false)); + + assertEquals(RaftState.Leader, raftActorBehavior.state()); + + + }}; + } class MockLeader extends Leader { @@ -706,14 +955,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { super(context); } - public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - new AtomicLong(nextIndex), - new AtomicLong(matchIndex)); - followerToLog.put(followerId, followerLogInformation); - } - public FollowerToSnapshot getFollowerToSnapshot() { return fts; } @@ -724,4 +965,26 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } } + + private class MockConfigParamsImpl extends DefaultConfigParamsImpl { + + private long electionTimeOutIntervalMillis; + private int snapshotChunkSize; + + public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) { + super(); + this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis; + this.snapshotChunkSize = snapshotChunkSize; + } + + @Override + public FiniteDuration getElectionTimeOutInterval() { + return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS); + } + + @Override + public int getSnapshotChunkSize() { + return snapshotChunkSize; + } + } }