X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=8251c6b265bd5ca85e247119104b4e6b268be9f6;hp=168eb3e5f22c9752dcbe089fe2e87393713d2650;hb=c671195065ce54e65f3b4a6da964871536f8a194;hpb=fa3763a80e5c8464252678565978527844ccdf98 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 168eb3e5f2..8251c6b265 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,10 +1,24 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; +import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Terminated; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; import com.google.common.base.Optional; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; @@ -18,6 +32,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -28,27 +43,14 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; import scala.concurrent.duration.FiniteDuration; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - public class LeaderTest extends AbstractRaftActorBehaviorTest { - private ActorRef leaderActor = + private final ActorRef leaderActor = getSystem().actorOf(Props.create(DoNothingActor.class)); - private ActorRef senderActor = + private final ActorRef senderActor = getSystem().actorOf(Props.create(DoNothingActor.class)); @Test @@ -64,46 +66,53 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { }}; } - @Test public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() { new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { + @Override protected void run() { - ActorRef followerActor = getTestActor(); MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + String followerId = "follower"; + peerAddresses.put(followerId, followerActor.path().toString()); actorContext.setPeerAddresses(peerAddresses); + long term = 1; + actorContext.getTermInformation().update(term, ""); + Leader leader = new Leader(actorContext); - leader.handleMessage(senderActor, new SendHeartBeat()); - final String out = - new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - Object msg = fromSerializableMessage(in); - if (msg instanceof AppendEntries) { - if (((AppendEntries)msg).getTerm() == 0) { - return "match"; - } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + // Leader should send an immediate heartbeat with no entries as follower is inactive. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class); + assertEquals("getTerm", term, appendEntries.getTerm()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); - assertEquals("match", out); + // The follower would normally reply - simulate that explicitly here. + leader.handleMessage(followerActor, new AppendEntriesReply( + followerId, term, true, lastIndex - 1, term)); + assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive()); + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams(). + getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(senderActor, new SendHeartBeat()); + + appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); } }; }}; @@ -112,50 +121,51 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test public void testHandleReplicateMessageSendAppendEntriesToFollower() { new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { + @Override protected void run() { - ActorRef followerActor = getTestActor(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); + MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + String followerId = "follower"; + peerAddresses.put(followerId, followerActor.path().toString()); actorContext.setPeerAddresses(peerAddresses); + long term = 1; + actorContext.getTermInformation().update(term, ""); + Leader leader = new Leader(actorContext); - RaftActorBehavior raftBehavior = leader - .handleMessage(senderActor, new Replicate(null, null, - new MockRaftActorContext.MockReplicatedLogEntry(1, - 100, - new MockRaftActorContext.MockPayload("foo")) - )); + + // Leader will send an immediate heartbeat - ignore it. + expectMsgClass(duration("5 seconds"), AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + followerId, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive()); + + MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); + MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( + 1, lastIndex + 1, payload); + actorContext.getReplicatedLog().append(newEntry); + RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, + new Replicate(null, null, newEntry)); // State should not change assertTrue(raftBehavior instanceof Leader); - final String out = - new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - Object msg = fromSerializableMessage(in); - if (msg instanceof AppendEntries) { - if (((AppendEntries)msg).getTerm() == 0) { - return "match"; - } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); + AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); } }; }}; @@ -164,8 +174,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test public void testHandleReplicateMessageWhenThereAreNoFollowers() { new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { + @Override protected void run() { ActorRef raftActor = getTestActor(); @@ -192,6 +202,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in instanceof ApplyState) { if (((ApplyState) in).getIdentifier().equals("state-id")) { @@ -263,10 +274,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.getFollowerToSnapshot().getNextChunk(); leader.getFollowerToSnapshot().incrementChunkIndex(); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); - AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching( - followerActor, AppendEntries.SERIALIZABLE_CLASS); + AppendEntries aeproto = MessageCollectorActor.getFirstMatching( + followerActor, AppendEntries.class); assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " + "received", aeproto); @@ -280,9 +294,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(senderActor, new SendHeartBeat()); - InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot) - MessageCollectorActor.getFirstMatching(followerActor, - InstallSnapshot.SERIALIZABLE_CLASS); + InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor, + InstallSnapshot.SERIALIZABLE_CLASS); assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot", isproto); @@ -337,6 +350,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { //update follower timestamp leader.markFollowerActive(followerActor.path().toString()); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( senderActor, new Replicate(null, "state-id", entry)); @@ -415,7 +431,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior raftBehavior = leader.handleMessage( leaderActor, new InitiateInstallSnapshot()); - CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor. + CaptureSnapshot cs = MessageCollectorActor. getFirstMatching(leaderActor, CaptureSnapshot.class); assertNotNull(cs); @@ -425,6 +441,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); + + // if an initiate is started again when first is in progress, it shouldnt initiate Capture + raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot()); + List captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class); + assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size()); + }}; } @@ -465,6 +487,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { Leader leader = new Leader(actorContext); + // Ignore initial heartbeat. + expectMsgClass(duration("5 seconds"), AppendEntries.class); + // new entry ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, @@ -479,6 +504,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in instanceof InstallSnapshotMessages.InstallSnapshot) { InstallSnapshot is = (InstallSnapshot) @@ -531,6 +557,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockLeader leader = new MockLeader(actorContext); + // Ignore initial heartbeat. + expectMsgClass(duration("5 seconds"), AppendEntries.class); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -559,16 +588,256 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertTrue(raftBehavior instanceof Leader); - assertEquals(leader.mapFollowerToSnapshot.size(), 0); - assertEquals(leader.followerToLog.size(), 1); - assertNotNull(leader.followerToLog.get(followerActor.path().toString())); - FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString()); - assertEquals(snapshotIndex, fli.getMatchIndex().get()); - assertEquals(snapshotIndex, fli.getMatchIndex().get()); - assertEquals(snapshotIndex + 1, fli.getNextIndex().get()); + assertEquals(0, leader.followerSnapshotSize()); + assertEquals(1, leader.followerLogSize()); + assertNotNull(leader.getFollower(followerActor.path().toString())); + FollowerLogInformation fli = leader.getFollower(followerActor.path().toString()); + assertEquals(snapshotIndex, fli.getMatchIndex()); + assertEquals(snapshotIndex, fli.getMatchIndex()); + assertEquals(snapshotIndex + 1, fli.getNextIndex()); + }}; + } + @Test + public void testSendSnapshotfromInstallSnapshotReply() throws Exception { + new JavaTestKit(getSystem()) {{ + + TestActorRef followerActor = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply"); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-reply", + followerActor.path().toString()); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){ + @Override + public int getSnapshotChunkSize() { + return 50; + } + }; + configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + + actorContext.setConfigParams(configParams); + actorContext.setPeerAddresses(peerAddresses); + actorContext.setCommitIndex(followersLastIndex); + + MockLeader leader = new MockLeader(actorContext); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + + List objectList = MessageCollectorActor.getAllMatching(followerActor, + InstallSnapshotMessages.InstallSnapshot.class); + + assertEquals(1, objectList.size()); + + Object o = objectList.get(0); + assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + + InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + "follower-reply", installSnapshot.getChunkIndex(), true)); + + objectList = MessageCollectorActor.getAllMatching(followerActor, + InstallSnapshotMessages.InstallSnapshot.class); + + assertEquals(2, objectList.size()); + + installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1); + + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + "follower-reply", installSnapshot.getChunkIndex(), true)); + + objectList = MessageCollectorActor.getAllMatching(followerActor, + InstallSnapshotMessages.InstallSnapshot.class); + + assertEquals(3, objectList.size()); + + installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2); + + // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + "follower-reply", installSnapshot.getChunkIndex(), true)); + + objectList = MessageCollectorActor.getAllMatching(followerActor, + InstallSnapshotMessages.InstallSnapshot.class); + + // Count should still stay at 3 + assertEquals(3, objectList.size()); }}; } + + @Test + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{ + new JavaTestKit(getSystem()) {{ + + TestActorRef followerActor = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower"); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + + actorContext.setConfigParams(new DefaultConfigParamsImpl(){ + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); + actorContext.setPeerAddresses(peerAddresses); + actorContext.setCommitIndex(followersLastIndex); + + MockLeader leader = new MockLeader(actorContext); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + + MessageCollectorActor.getAllMatching(followerActor, + InstallSnapshotMessages.InstallSnapshot.class); + + InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); + assertNotNull(installSnapshot); + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + + followerActor.underlyingActor().clear(); + + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + followerActor.path().toString(), -1, false)); + + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + installSnapshot = MessageCollectorActor.getFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); + assertNotNull(installSnapshot); + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + + followerActor.tell(PoisonPill.getInstance(), getRef()); + }}; + } + + @Test + public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef followerActor = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk"); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); + actorContext.setPeerAddresses(peerAddresses); + actorContext.setCommitIndex(followersLastIndex); + + MockLeader leader = new MockLeader(actorContext); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + + InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); + assertNotNull(installSnapshot); + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode()); + + int hashCode = installSnapshot.getData().hashCode(); + + followerActor.underlyingActor().clear(); + + leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true )); + + installSnapshot = MessageCollectorActor.getFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); + assertNotNull(installSnapshot); + + assertEquals(2, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + assertEquals(hashCode, installSnapshot.getLastChunkHashCode()); + + followerActor.tell(PoisonPill.getInstance(), getRef()); + }}; + } + @Test public void testFollowerToSnapshotLogic() { @@ -626,8 +895,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { return createActorContext(leaderActor); } + @Override protected RaftActorContext createActorContext(ActorRef actorRef) { - return new MockRaftActorContext("test", getSystem(), actorRef); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(100000); + MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef); + context.setConfigParams(configParams); + return context; } private ByteString toByteString(Map state) { @@ -656,43 +931,41 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } public static class ForwardMessageToBehaviorActor extends MessageCollectorActor { - private static AbstractRaftActorBehavior behavior; - - public ForwardMessageToBehaviorActor(){ - - } + AbstractRaftActorBehavior behavior; @Override public void onReceive(Object message) throws Exception { + if(behavior != null) { + behavior.handleMessage(sender(), message); + } + super.onReceive(message); - behavior.handleMessage(sender(), message); } - public static void setBehavior(AbstractRaftActorBehavior behavior){ - ForwardMessageToBehaviorActor.behavior = behavior; + public static Props props() { + return Props.create(ForwardMessageToBehaviorActor.class); } } @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { new JavaTestKit(getSystem()) {{ - - ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + TestActorRef leaderActor = TestActorRef.create(getSystem(), + Props.create(ForwardMessageToBehaviorActor.class)); MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + new MockRaftActorContext("leader", getSystem(), leaderActor); - ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class)); + TestActorRef followerActor = TestActorRef.create(getSystem(), + ForwardMessageToBehaviorActor.props()); MockRaftActorContext followerActorContext = - new MockRaftActorContext("follower", getSystem(), followerActor); + new MockRaftActorContext("follower", getSystem(), followerActor); Follower follower = new Follower(followerActorContext); - - ForwardMessageToBehaviorActor.setBehavior(follower); + followerActor.underlyingActor().behavior = follower; Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + peerAddresses.put("follower", followerActor.path().toString()); leaderActorContext.setPeerAddresses(peerAddresses); @@ -700,7 +973,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { //create 3 entries leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); leaderActorContext.setCommitIndex(1); @@ -708,35 +981,29 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { // follower too has the exact same log entries and has the same commit index followerActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); followerActorContext.setCommitIndex(1); Leader leader = new Leader(leaderActorContext); - leader.markFollowerActive(followerActor.path().toString()); - - leader.handleMessage(leaderActor, new SendHeartBeat()); - - AppendEntriesMessages.AppendEntries appendEntries = - (AppendEntriesMessages.AppendEntries) MessageCollectorActor - .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class); + AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); assertNotNull(appendEntries); assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getLogEntries(0).getIndex()); + assertEquals(0, appendEntries.getEntries().size()); assertEquals(0, appendEntries.getPrevLogIndex()); - AppendEntriesReply appendEntriesReply = - (AppendEntriesReply) MessageCollectorActor.getFirstMatching( + AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching( leaderActor, AppendEntriesReply.class); - assertNotNull(appendEntriesReply); - // follower returns its next index assertEquals(2, appendEntriesReply.getLogLastIndex()); assertEquals(1, appendEntriesReply.getLogLastTerm()); + // follower returns its next index + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); }}; } @@ -744,67 +1011,83 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception { new JavaTestKit(getSystem()) {{ - - ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + TestActorRef leaderActor = TestActorRef.create(getSystem(), + Props.create(ForwardMessageToBehaviorActor.class)); MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + new MockRaftActorContext("leader", getSystem(), leaderActor); - ActorRef followerActor = getSystem().actorOf( - Props.create(ForwardMessageToBehaviorActor.class)); + TestActorRef followerActor = TestActorRef.create(getSystem(), + ForwardMessageToBehaviorActor.props()); MockRaftActorContext followerActorContext = - new MockRaftActorContext("follower", getSystem(), followerActor); + new MockRaftActorContext("follower", getSystem(), followerActor); Follower follower = new Follower(followerActorContext); - - ForwardMessageToBehaviorActor.setBehavior(follower); + followerActor.underlyingActor().behavior = follower; Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + peerAddresses.put("follower", followerActor.path().toString()); leaderActorContext.setPeerAddresses(peerAddresses); leaderActorContext.getReplicatedLog().removeFrom(0); leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); leaderActorContext.setCommitIndex(1); followerActorContext.getReplicatedLog().removeFrom(0); followerActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); // follower has the same log entries but its commit index > leaders commit index followerActorContext.setCommitIndex(2); Leader leader = new Leader(leaderActorContext); - leader.markFollowerActive(followerActor.path().toString()); - - leader.handleMessage(leaderActor, new SendHeartBeat()); - - AppendEntriesMessages.AppendEntries appendEntries = - (AppendEntriesMessages.AppendEntries) MessageCollectorActor - .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class); + // Initial heartbeat + AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); assertNotNull(appendEntries); assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getLogEntries(0).getIndex()); + assertEquals(0, appendEntries.getEntries().size()); assertEquals(0, appendEntries.getPrevLogIndex()); - AppendEntriesReply appendEntriesReply = - (AppendEntriesReply) MessageCollectorActor.getFirstMatching( + AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching( leaderActor, AppendEntriesReply.class); + assertNotNull(appendEntriesReply); + + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); + + leaderActor.underlyingActor().behavior = leader; + leader.handleMessage(followerActor, appendEntriesReply); + + leaderActor.underlyingActor().clear(); + followerActor.underlyingActor().clear(); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); + assertNotNull(appendEntries); + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(0, appendEntries.getEntries().size()); + assertEquals(2, appendEntries.getPrevLogIndex()); + + appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class); assertNotNull(appendEntriesReply); assertEquals(2, appendEntriesReply.getLogLastIndex()); assertEquals(1, appendEntriesReply.getLogLastTerm()); + assertEquals(1, followerActorContext.getCommitIndex()); }}; } @@ -878,8 +1161,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(2, leaderActorContext.getCommitIndex()); ApplyLogEntries applyLogEntries = - (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor, - ApplyLogEntries.class); + MessageCollectorActor.getFirstMatching(leaderActor, + ApplyLogEntries.class); assertNotNull(applyLogEntries); @@ -941,10 +1224,168 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false)); assertEquals(RaftState.Leader, raftActorBehavior.state()); + }}; + } + @Test + public void testIsolatedLeaderCheckNoFollowers() { + new JavaTestKit(getSystem()) {{ + ActorRef leaderActor = getTestActor(); - }}; + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + Map peerAddresses = new HashMap<>(); + leaderActorContext.setPeerAddresses(peerAddresses); + + Leader leader = new Leader(leaderActorContext); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue(behavior instanceof Leader); + }}; + } + + @Test + public void testIsolatedLeaderCheckTwoFollowers() throws Exception { + new JavaTestKit(getSystem()) {{ + + ActorRef followerActor1 = getTestActor(); + ActorRef followerActor2 = getTestActor(); + + MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext(); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + peerAddresses.put("follower-2", followerActor2.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + + Leader leader = new Leader(leaderActorContext); + leader.stopIsolatedLeaderCheckSchedule(); + + leader.markFollowerActive("follower-1"); + leader.markFollowerActive("follower-2"); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue("Behavior not instance of Leader when all followers are active", + behavior instanceof Leader); + + // kill 1 follower and verify if that got killed + final JavaTestKit probe = new JavaTestKit(getSystem()); + probe.watch(followerActor1); + followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); + final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); + assertEquals(termMsg1.getActor(), followerActor1); + + leader.markFollowerInActive("follower-1"); + leader.markFollowerActive("follower-2"); + behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", + behavior instanceof Leader); + + // kill 2nd follower and leader should change to Isolated leader + followerActor2.tell(PoisonPill.getInstance(), null); + probe.watch(followerActor2); + followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender()); + final Terminated termMsg2 = probe.expectMsgClass(Terminated.class); + assertEquals(termMsg2.getActor(), followerActor2); + + leader.markFollowerInActive("follower-2"); + behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", + behavior instanceof IsolatedLeader); + + }}; + } + + + @Test + public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception { + new JavaTestKit(getSystem()) {{ + TestActorRef leaderActor = TestActorRef.create(getSystem(), + Props.create(MessageCollectorActor.class)); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + + leaderActorContext.setConfigParams(configParams); + + TestActorRef followerActor = TestActorRef.create(getSystem(), + ForwardMessageToBehaviorActor.props()); + + MockRaftActorContext followerActorContext = + new MockRaftActorContext("follower-reply", getSystem(), followerActor); + + followerActorContext.setConfigParams(configParams); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().behavior = follower; + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-reply", + followerActor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + + leaderActorContext.getReplicatedLog().removeFrom(0); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + followerActorContext.getReplicatedLog().removeFrom(0); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Leader leader = new Leader(leaderActorContext); + + AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching( + leaderActor, AppendEntriesReply.class); + assertNotNull(appendEntriesReply); + System.out.println("appendEntriesReply: "+appendEntriesReply); + leader.handleMessage(followerActor, appendEntriesReply); + + // Clear initial heartbeat messages + + leaderActor.underlyingActor().clear(); + followerActor.underlyingActor().clear(); + + // create 3 entries + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + leaderActorContext.setCommitIndex(1); + leaderActorContext.setLastApplied(1); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); + assertNotNull(appendEntries); + + // Should send first log entry + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(0, appendEntries.getEntries().get(0).getIndex()); + assertEquals(-1, appendEntries.getPrevLogIndex()); + + appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class); + assertNotNull(appendEntriesReply); + + assertEquals(1, appendEntriesReply.getLogLastTerm()); + assertEquals(0, appendEntriesReply.getLogLastIndex()); + + followerActor.underlyingActor().clear(); + + leader.handleAppendEntriesReply(followerActor, appendEntriesReply); + + appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); + assertNotNull(appendEntries); + + // Should send second log entry + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(1, appendEntries.getEntries().get(0).getIndex()); + }}; } class MockLeader extends Leader { @@ -961,15 +1402,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { public void createFollowerToSnapshot(String followerId, ByteString bs ) { fts = new FollowerToSnapshot(bs); - mapFollowerToSnapshot.put(followerId, fts); - + setFollowerSnapshot(followerId, fts); } } private class MockConfigParamsImpl extends DefaultConfigParamsImpl { - private long electionTimeOutIntervalMillis; - private int snapshotChunkSize; + private final long electionTimeOutIntervalMillis; + private final int snapshotChunkSize; public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) { super();