From: tpantelis Date: Tue, 10 Feb 2015 13:15:07 +0000 (-0500) Subject: Refactor LeaderTest X-Git-Tag: release/lithium~536^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=03e752cbd625921ece92c5281cd4e1a8c81b3210 Refactor LeaderTest Removed the remaining Within blocks in tests. Changed each test to use the leaderActor and followerActor class members for consistency. The actors are killed on tearDown. The creation of a JavaTestKit block is no longer necessary and was removed from each test. Added a shorcut method createActorContextWithFollower that creates the leader context and adds a peer follower. This eliminates some duplicate code in the tests. Added an expectFirstMatching method to MessageCollector that waits for the message in contrast to getFirstMatching. Added some more assertions in some tests. Change-Id: Id4ee0291352999b31e40abd9895e3d0237acf432 Signed-off-by: tpantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 935c4f0bbd..04b9f163f4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -100,4 +100,16 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { public long timeSinceLastActivity() { return stopwatch.elapsed(TimeUnit.MILLISECONDS); } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex) + .append(", matchIndex=").append(matchIndex).append(", stopwatch=") + .append(stopwatch.elapsed(TimeUnit.MILLISECONDS)) + .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]"); + return builder.toString(); + } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 4d33152b41..e10fb8d293 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -53,7 +53,7 @@ public class MockRaftActorContext implements RaftActorContext { * Identifier of the actor whose election term information this is */ private final String id = id1; - private long currentTerm = 0; + private long currentTerm = 1; private String votedFor = ""; @Override @@ -88,8 +88,9 @@ public class MockRaftActorContext implements RaftActorContext { public void initReplicatedLog(){ this.replicatedLog = new SimpleReplicatedLog(); - this.replicatedLog.append(new MockReplicatedLogEntry(1, 0, new MockPayload("1"))); - this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("2"))); + long term = getTermInformation().getCurrentTerm(); + this.replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1"))); + this.replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2"))); } @Override public ActorRef actorOf(Props props) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 59046fd779..a0663efdd5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -897,24 +897,24 @@ public class RaftActorTest extends AbstractActorTest { // sleeping for a minimum of 2 seconds, if it spans more its fine. Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); - List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); assertNotNull(matches); assertEquals(3, matches.size()); // check if the notifier got a role change from null to Follower - RoleChanged raftRoleChanged = (RoleChanged) matches.get(0); + RoleChanged raftRoleChanged = matches.get(0); assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertNull(raftRoleChanged.getOldRole()); assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); // check if the notifier got a role change from Follower to Candidate - raftRoleChanged = (RoleChanged) matches.get(1); + raftRoleChanged = matches.get(1); assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); // check if the notifier got a role change from Candidate to Leader - raftRoleChanged = (RoleChanged) matches.get(2); + raftRoleChanged = matches.get(2); assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java index 6872c8fa45..b47df13fed 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java @@ -21,6 +21,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import java.util.LinkedList; import java.util.List; @@ -108,10 +109,14 @@ public class TestActorFactory implements AutoCloseable { } @Override - public void close() throws Exception { - for(ActorRef actor : createdActors){ - LOG.info("Killing actor {}", actor); - actor.tell(PoisonPill.getInstance(), null); - } + public void close() { + new JavaTestKit(system) {{ + for(ActorRef actor : createdActors) { + watch(actor); + LOG.info("Killing actor {}", actor); + actor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + expectTerminated(duration("5 seconds"), actor); + } + }}; } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 42a7911be3..17bfebb816 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -1,8 +1,12 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; +import java.util.ArrayList; +import java.util.List; import org.junit.Test; import org.opendaylight.controller.cluster.raft.AbstractActorTest; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; @@ -17,12 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { private final ActorRef behaviorActor = getSystem().actorOf(Props.create( @@ -67,8 +65,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { throws Exception { new JavaTestKit(getSystem()) {{ - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + MockRaftActorContext context = createActorContext(); // First set the receivers term to a high number (1000) context.getTermInformation().update(1000, "test"); @@ -90,6 +87,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof AppendEntriesReply) { AppendEntriesReply reply = (AppendEntriesReply) in; @@ -112,8 +110,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new JavaTestKit(getSystem()) { { - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + MockRaftActorContext context = createActorContext(); // First set the receivers term to lower number context.getTermInformation().update(2, "test"); @@ -169,6 +166,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { RaftActorBehavior behavior = createBehavior( @@ -185,6 +183,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof RequestVoteReply) { RequestVoteReply reply = @@ -213,6 +212,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { RaftActorContext actorContext = @@ -238,6 +238,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof RequestVoteReply) { RequestVoteReply reply = @@ -268,6 +269,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { RaftActorContext context = @@ -284,6 +286,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof RequestVoteReply) { RequestVoteReply reply = @@ -371,11 +374,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { return createBehavior(createActorContext()); } - protected RaftActorContext createActorContext() { + protected MockRaftActorContext createActorContext() { return new MockRaftActorContext(); } - protected RaftActorContext createActorContext(ActorRef actor) { + protected MockRaftActorContext createActorContext(ActorRef actor) { return new MockRaftActorContext("test", getSystem(), actor); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 0dc68c2461..0a715b2d04 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -1,5 +1,6 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; @@ -20,8 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import static org.junit.Assert.assertEquals; - public class CandidateTest extends AbstractRaftActorBehaviorTest { private final ActorRef candidateActor = getSystem().actorOf(Props.create( @@ -81,12 +80,14 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { new JavaTestKit(getSystem()) {{ new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) { + @Override protected void run() { Candidate candidate = new Candidate(createActorContext(getTestActor())); final Boolean out = new ExpectMsg(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof ElectionTimeout) { return true; @@ -117,7 +118,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { @Test public void testHandleElectionTimeoutWhenThereAreTwoNodesInCluster(){ MockRaftActorContext raftActorContext = - (MockRaftActorContext) createActorContext(); + createActorContext(); raftActorContext.setPeerAddresses(onePeer); Candidate candidate = new Candidate(raftActorContext); @@ -131,7 +132,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { @Test public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodesInCluster(){ MockRaftActorContext raftActorContext = - (MockRaftActorContext) createActorContext(); + createActorContext(); raftActorContext.setPeerAddresses(twoPeers); Candidate candidate = new Candidate(raftActorContext); @@ -145,7 +146,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { @Test public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodesInCluster(){ MockRaftActorContext raftActorContext = - (MockRaftActorContext) createActorContext(); + createActorContext(); raftActorContext.setPeerAddresses(fourPeers); Candidate candidate = new Candidate(raftActorContext); @@ -164,6 +165,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { Candidate candidate = new Candidate(createActorContext(getTestActor())); @@ -172,6 +174,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesResponse") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof AppendEntriesReply) { AppendEntriesReply reply = (AppendEntriesReply) in; @@ -193,6 +196,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { Candidate candidate = new Candidate(createActorContext(getTestActor())); @@ -201,6 +205,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesResponse") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof RequestVoteReply) { RequestVoteReply reply = (RequestVoteReply) in; @@ -222,6 +227,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { RaftActorContext context = createActorContext(getTestActor()); @@ -236,6 +242,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof RequestVoteReply) { RequestVoteReply reply = (RequestVoteReply) in; @@ -257,6 +264,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { RaftActorContext context = createActorContext(getTestActor()); @@ -269,6 +277,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof RequestVoteReply) { RequestVoteReply reply = (RequestVoteReply) in; @@ -291,7 +300,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { return new Candidate(actorContext); } - @Override protected RaftActorContext createActorContext() { + @Override protected MockRaftActorContext createActorContext() { return new MockRaftActorContext("test", getSystem(), candidateActor); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 719a8256a0..6b0857351d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -41,11 +41,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { return new Follower(actorContext); } - @Override protected RaftActorContext createActorContext() { + @Override + protected MockRaftActorContext createActorContext() { return createActorContext(followerActor); } - protected RaftActorContext createActorContext(ActorRef actorRef){ + @Override + protected MockRaftActorContext createActorContext(ActorRef actorRef){ return new MockRaftActorContext("test", getSystem(), actorRef); } @@ -54,12 +56,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new JavaTestKit(getSystem()) {{ new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) { + @Override protected void run() { Follower follower = new Follower(createActorContext(getTestActor())); final Boolean out = new ExpectMsg(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof ElectionTimeout) { return true; @@ -92,6 +96,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { RaftActorContext context = createActorContext(getTestActor()); @@ -104,6 +109,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof RequestVoteReply) { RequestVoteReply reply = (RequestVoteReply) in; @@ -125,6 +131,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { RaftActorContext context = createActorContext(getTestActor()); @@ -137,6 +144,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof RequestVoteReply) { RequestVoteReply reply = (RequestVoteReply) in; @@ -203,8 +211,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { throws Exception { new JavaTestKit(getSystem()) {{ - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + MockRaftActorContext context = createActorContext(); // First set the receivers term to lower number context.getTermInformation().update(95, "test"); @@ -233,6 +240,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof AppendEntriesReply) { AppendEntriesReply reply = (AppendEntriesReply) in; @@ -263,8 +271,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { public void testHandleAppendEntriesAddNewEntries() throws Exception { new JavaTestKit(getSystem()) {{ - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + MockRaftActorContext context = createActorContext(); // First set the receivers term to lower number context.getTermInformation().update(1, "test"); @@ -312,6 +319,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof AppendEntriesReply) { AppendEntriesReply reply = (AppendEntriesReply) in; @@ -343,8 +351,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { throws Exception { new JavaTestKit(getSystem()) {{ - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + MockRaftActorContext context = createActorContext(); // First set the receivers term to lower number context.getTermInformation().update(2, "test"); @@ -405,6 +412,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof AppendEntriesReply) { AppendEntriesReply reply = (AppendEntriesReply) in; @@ -425,8 +433,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { public void testHandleAppendEntriesPreviousLogEntryMissing(){ new JavaTestKit(getSystem()) {{ - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + MockRaftActorContext context = createActorContext(); // Prepare the receivers log MockRaftActorContext.SimpleReplicatedLog log = @@ -462,6 +469,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof AppendEntriesReply) { AppendEntriesReply reply = (AppendEntriesReply) in; @@ -482,8 +490,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { public void testHandleAppendAfterInstallingSnapshot(){ new JavaTestKit(getSystem()) {{ - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + MockRaftActorContext context = createActorContext(); // Prepare the receivers log @@ -518,6 +525,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesReply") { // do not put code outside this method, will run afterwards + @Override protected Boolean match(Object in) { if (in instanceof AppendEntriesReply) { AppendEntriesReply reply = (AppendEntriesReply) in; @@ -548,8 +556,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { ActorRef leaderActor = getSystem().actorOf(Props.create( MessageCollectorActor.class)); - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(getRef()); + MockRaftActorContext context = createActorContext(getRef()); Follower follower = (Follower)createBehavior(context); @@ -641,8 +648,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { ActorRef leaderActor = getSystem().actorOf(Props.create( MessageCollectorActor.class)); - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(getRef()); + MockRaftActorContext context = createActorContext(getRef()); Follower follower = (Follower) createBehavior(context); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java index 708068a789..6197f980ea 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; @@ -18,15 +20,13 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class IsolatedLeaderTest extends AbstractRaftActorBehaviorTest { - private ActorRef leaderActor = + private final ActorRef leaderActor = getSystem().actorOf(Props.create(DoNothingActor.class)); - private ActorRef senderActor = + private final ActorRef senderActor = getSystem().actorOf(Props.create(DoNothingActor.class)); @Override @@ -36,7 +36,7 @@ public class IsolatedLeaderTest extends AbstractRaftActorBehaviorTest { } @Override - protected RaftActorContext createActorContext() { + protected MockRaftActorContext createActorContext() { return createActorContext(leaderActor); } @@ -47,7 +47,7 @@ public class IsolatedLeaderTest extends AbstractRaftActorBehaviorTest { String followerAddress1 = "akka://test/user/$a"; String followerAddress2 = "akka://test/user/$b"; - MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext(); + MockRaftActorContext leaderActorContext = createActorContext(); Map peerAddresses = new HashMap<>(); peerAddresses.put("follower-1", followerAddress1); peerAddresses.put("follower-2", followerAddress2); @@ -80,7 +80,7 @@ public class IsolatedLeaderTest extends AbstractRaftActorBehaviorTest { String followerAddress3 = "akka://test/user/$c"; String followerAddress4 = "akka://test/user/$d"; - MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext(); + MockRaftActorContext leaderActorContext = createActorContext(); Map peerAddresses = new HashMap<>(); peerAddresses.put("follower-1", followerAddress1); peerAddresses.put("follower-2", followerAddress2); @@ -118,7 +118,7 @@ public class IsolatedLeaderTest extends AbstractRaftActorBehaviorTest { String followerAddress1 = "akka://test/user/$a"; String followerAddress2 = "akka://test/user/$b"; - MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext(); + MockRaftActorContext leaderActorContext = createActorContext(); Map peerAddresses = new HashMap<>(); peerAddresses.put("follower-1", followerAddress1); peerAddresses.put("follower-2", followerAddress2); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 119b958799..f087793674 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -10,6 +10,7 @@ import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import java.io.ByteArrayOutputStream; @@ -19,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; @@ -28,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -35,810 +38,658 @@ import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderChec import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; public class LeaderTest extends AbstractRaftActorBehaviorTest { - private final ActorRef leaderActor = - getSystem().actorOf(Props.create(DoNothingActor.class)); - private final ActorRef senderActor = - getSystem().actorOf(Props.create(DoNothingActor.class)); + static final String FOLLOWER_ID = "follower"; - @Test - public void testHandleMessageForUnknownMessage() throws Exception { - new JavaTestKit(getSystem()) {{ - Leader leader = - new Leader(createActorContext()); + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); - // handle message should return the Leader state when it receives an - // unknown message - RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo"); - Assert.assertTrue(behavior instanceof Leader); - }}; + private final TestActorRef leaderActor = actorFactory.createTestActor( + Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader")); + + private final TestActorRef followerActor = actorFactory.createTestActor( + Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower")); + + private Leader leader; + + @After + public void tearDown() throws Exception { + if(leader != null) { + leader.close(); + } + + actorFactory.close(); + } + + private void logStart(String name) { + LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name); } @Test - public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() { - new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { - @Override - protected void run() { - ActorRef followerActor = getTestActor(); + public void testHandleMessageForUnknownMessage() throws Exception { + logStart("testHandleMessageForUnknownMessage"); + + leader = new Leader(createActorContext()); - MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); + // handle message should return the Leader state when it receives an + // unknown message + RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo"); + Assert.assertTrue(behavior instanceof Leader); + } - Map peerAddresses = new HashMap<>(); + @Test + public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception { + logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); - String followerId = "follower"; - peerAddresses.put(followerId, followerActor.path().toString()); + MockRaftActorContext actorContext = createActorContextWithFollower(); - actorContext.setPeerAddresses(peerAddresses); + long term = 1; + actorContext.getTermInformation().update(term, ""); - long term = 1; - actorContext.getTermInformation().update(term, ""); + leader = new Leader(actorContext); - Leader leader = new Leader(actorContext); + // Leader should send an immediate heartbeat with no entries as follower is inactive. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getTerm", term, appendEntries.getTerm()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); - // Leader should send an immediate heartbeat with no entries as follower is inactive. - long lastIndex = actorContext.getReplicatedLog().lastIndex(); - AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class); - assertEquals("getTerm", term, appendEntries.getTerm()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); - assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm()); - assertEquals("Entries size", 0, appendEntries.getEntries().size()); + // The follower would normally reply - simulate that explicitly here. + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex - 1, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); - // The follower would normally reply - simulate that explicitly here. - leader.handleMessage(followerActor, new AppendEntriesReply( - followerId, term, true, lastIndex - 1, term)); - assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive()); + followerActor.underlyingActor().clear(); - // Sleep for the heartbeat interval so AppendEntries is sent. - Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams(). - getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams(). + getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(senderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, new SendHeartBeat()); - appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class); - assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex()); - assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); - assertEquals("Entries size", 1, appendEntries.getEntries().size()); - assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex()); - assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); - } - }; - }}; + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); } @Test - public void testHandleReplicateMessageSendAppendEntriesToFollower() { - new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { - @Override - protected void run() { - ActorRef followerActor = getTestActor(); - - MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); - - Map peerAddresses = new HashMap<>(); + public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception { + logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); - String followerId = "follower"; - peerAddresses.put(followerId, followerActor.path().toString()); + MockRaftActorContext actorContext = createActorContextWithFollower(); - actorContext.setPeerAddresses(peerAddresses); + long term = 1; + actorContext.getTermInformation().update(term, ""); - long term = 1; - actorContext.getTermInformation().update(term, ""); + leader = new Leader(actorContext); - Leader leader = new Leader(actorContext); + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // Leader will send an immediate heartbeat - ignore it. - expectMsgClass(duration("5 seconds"), AppendEntries.class); + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); - // The follower would normally reply - simulate that explicitly here. - long lastIndex = actorContext.getReplicatedLog().lastIndex(); - leader.handleMessage(followerActor, new AppendEntriesReply( - followerId, term, true, lastIndex, term)); - assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive()); + followerActor.underlyingActor().clear(); - MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 1, lastIndex + 1, payload); - actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, - new Replicate(null, null, newEntry)); + MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); + MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( + 1, lastIndex + 1, payload); + actorContext.getReplicatedLog().append(newEntry); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new Replicate(null, null, newEntry)); - // State should not change - assertTrue(raftBehavior instanceof Leader); + // State should not change + assertTrue(raftBehavior instanceof Leader); - AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class); - assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); - assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); - assertEquals("Entries size", 1, appendEntries.getEntries().size()); - assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); - assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); - assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); - } - }; - }}; + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); } @Test - public void testHandleReplicateMessageWhenThereAreNoFollowers() { - new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { - @Override - protected void run() { + public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception { + logStart("testHandleReplicateMessageWhenThereAreNoFollowers"); - ActorRef raftActor = getTestActor(); + MockRaftActorContext actorContext = createActorContext(); - MockRaftActorContext actorContext = - new MockRaftActorContext("test", getSystem(), raftActor); + leader = new Leader(actorContext); - actorContext.getReplicatedLog().removeFrom(0); + actorContext.setLastApplied(0); - actorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1) - .build()); + long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; + long term = actorContext.getTermInformation().getCurrentTerm(); + MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( + term, newLogIndex, new MockRaftActorContext.MockPayload("foo")); - Leader leader = new Leader(actorContext); - RaftActorBehavior raftBehavior = leader - .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1))); + actorContext.getReplicatedLog().append(newEntry); - // State should not change - assertTrue(raftBehavior instanceof Leader); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new Replicate(leaderActor, "state-id", newEntry)); - assertEquals(1, actorContext.getCommitIndex()); + // State should not change + assertTrue(raftBehavior instanceof Leader); - final String out = - new ExpectMsg(duration("1 seconds"), - "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in instanceof ApplyState) { - if (((ApplyState) in).getIdentifier().equals("state-id")) { - return "match"; - } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex()); - assertEquals("match", out); + // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous + // one since lastApplied state is 0. + List applyStateList = MessageCollectorActor.getAllMatching( + leaderActor, ApplyState.class); + assertEquals("ApplyState count", newLogIndex, applyStateList.size()); - } - }; - }}; + for(int i = 0; i <= newLogIndex - 1; i++ ) { + ApplyState applyState = applyStateList.get(i); + assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm()); + } + + ApplyState last = applyStateList.get((int) newLogIndex - 1); + assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData()); + assertEquals("getIdentifier", "state-id", last.getIdentifier()); } @Test public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception { - new JavaTestKit(getSystem()) {{ - ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); - - Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); - - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(leaderActor); - actorContext.setPeerAddresses(peerAddresses); - - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); - - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; - - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.setCommitIndex(followersLastIndex); - //set follower timeout to 2 mins, helps during debugging - actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10)); - - MockLeader leader = new MockLeader(actorContext); - - // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); + logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot"); - //update follower timestamp - leader.markFollowerActive(followerActor.path().toString()); + MockRaftActorContext actorContext = createActorContextWithFollower(); - ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); - leader.createFollowerToSnapshot(followerActor.path().toString(), bs); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - //send first chunk and no InstallSnapshotReply received yet - leader.getFollowerToSnapshot().getNextChunk(); - leader.getFollowerToSnapshot().incrementChunkIndex(); + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); - Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; - leader.handleMessage(leaderActor, new SendHeartBeat()); + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setCommitIndex(followersLastIndex); + //set follower timeout to 2 mins, helps during debugging + actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10)); - AppendEntries aeproto = MessageCollectorActor.getFirstMatching( - followerActor, AppendEntries.class); + leader = new Leader(actorContext); - assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " + - "received", aeproto); + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); - AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto); + //update follower timestamp + leader.markFollowerActive(FOLLOWER_ID); - assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty()); + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); + leader.setFollowerSnapshot(FOLLOWER_ID, fts); - //InstallSnapshotReply received - leader.getFollowerToSnapshot().markSendStatus(true); + //send first chunk and no InstallSnapshotReply received yet + fts.getNextChunk(); + fts.incrementChunkIndex(); - leader.handleMessage(senderActor, new SendHeartBeat()); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); - InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor, - InstallSnapshot.SERIALIZABLE_CLASS); + leader.handleMessage(leaderActor, new SendHeartBeat()); - assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot", - isproto); + AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto); + AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto); - assertEquals(snapshotIndex, is.getLastIncludedIndex()); + assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty()); - }}; - } + //InstallSnapshotReply received + fts.markSendStatus(true); - @Test - public void testSendAppendEntriesSnapshotScenario() { - new JavaTestKit(getSystem()) {{ + leader.handleMessage(leaderActor, new SendHeartBeat()); - ActorRef followerActor = getTestActor(); + InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.SERIALIZABLE_CLASS); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(getRef()); - actorContext.setPeerAddresses(peerAddresses); + assertEquals(snapshotIndex, is.getLastIncludedIndex()); + } - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + @Test + public void testSendAppendEntriesSnapshotScenario() throws Exception { + logStart("testSendAppendEntriesSnapshotScenario"); - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.setCommitIndex(followersLastIndex); + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); - Leader leader = new Leader(actorContext); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; - // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setCommitIndex(followersLastIndex); - //update follower timestamp - leader.markFollowerActive(followerActor.path().toString()); + leader = new Leader(actorContext); - Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex - RaftActorBehavior raftBehavior = leader.handleMessage( - senderActor, new Replicate(null, "state-id", entry)); + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); - assertTrue(raftBehavior instanceof Leader); + //update follower timestamp + leader.markFollowerActive(FOLLOWER_ID); - // we might receive some heartbeat messages, so wait till we get CaptureSnapshot - Boolean[] matches = new ReceiveWhile(Boolean.class, duration("2 seconds")) { - @Override - protected Boolean match(Object o) throws Exception { - if (o instanceof CaptureSnapshot) { - return true; - } - return false; - } - }.get(); + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex + RaftActorBehavior raftBehavior = leader.handleMessage( + leaderActor, new Replicate(null, "state-id", entry)); - boolean captureSnapshot = false; - for (Boolean b: matches) { - captureSnapshot = b | captureSnapshot; - } + assertTrue(raftBehavior instanceof Leader); - assertTrue(captureSnapshot); - }}; + MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); } @Test public void testInitiateInstallSnapshot() throws Exception { - new JavaTestKit(getSystem()) {{ + logStart("testInitiateInstallSnapshot"); - ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + MockRaftActorContext actorContext = createActorContextWithFollower(); - ActorRef followerActor = getTestActor(); - - Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), followerActor.path().toString()); - - MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(leaderActor); - actorContext.setPeerAddresses(peerAddresses); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setLastApplied(3); + actorContext.setCommitIndex(followersLastIndex); - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.setLastApplied(3); - actorContext.setCommitIndex(followersLastIndex); + leader = new Leader(actorContext); - Leader leader = new Leader(actorContext); - // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(Optional.absent()); + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // new entry - ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); + // set the snapshot as absent and check if capture-snapshot is invoked. + leader.setSnapshot(Optional.absent()); - actorContext.getReplicatedLog().append(entry); + // new entry + ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); - //update follower timestamp - leader.markFollowerActive(followerActor.path().toString()); + actorContext.getReplicatedLog().append(entry); - RaftActorBehavior raftBehavior = leader.handleMessage( - senderActor, new Replicate(null, "state-id", entry)); + //update follower timestamp + leader.markFollowerActive(FOLLOWER_ID); - CaptureSnapshot cs = MessageCollectorActor. - getFirstMatching(leaderActor, CaptureSnapshot.class); + leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - assertNotNull(cs); + CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); - assertTrue(cs.isInstallSnapshotInitiated()); - assertEquals(3, cs.getLastAppliedIndex()); - assertEquals(1, cs.getLastAppliedTerm()); - assertEquals(4, cs.getLastIndex()); - assertEquals(2, cs.getLastTerm()); + assertTrue(cs.isInstallSnapshotInitiated()); + assertEquals(3, cs.getLastAppliedIndex()); + assertEquals(1, cs.getLastAppliedTerm()); + assertEquals(4, cs.getLastIndex()); + assertEquals(2, cs.getLastTerm()); - // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(senderActor, new Replicate(null, "state-id", entry)); - List captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class); - assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size()); + // if an initiate is started again when first is in progress, it shouldnt initiate Capture + leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - }}; + List captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class); + assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size()); } @Test - public void testInstallSnapshot() { - new JavaTestKit(getSystem()) {{ + public void testInstallSnapshot() throws Exception { + logStart("testInstallSnapshot"); - ActorRef followerActor = getTestActor(); + MockRaftActorContext actorContext = createActorContextWithFollower(); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); - actorContext.setPeerAddresses(peerAddresses); + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + actorContext.setCommitIndex(followersLastIndex); - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + leader = new Leader(actorContext); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; + // Ignore initial heartbeat. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - actorContext.setCommitIndex(followersLastIndex); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(toByteString(leadersSnapshot))); - Leader leader = new Leader(actorContext); + assertTrue(raftBehavior instanceof Leader); - // Ignore initial heartbeat. - expectMsgClass(duration("5 seconds"), AppendEntries.class); + // check if installsnapshot gets called with the correct values. - // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); + InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable( + MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class)); - RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, - new SendInstallSnapshot(toByteString(leadersSnapshot))); + assertNotNull(installSnapshot.getData()); + assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex()); + assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); - assertTrue(raftBehavior instanceof Leader); - - // check if installsnapshot gets called with the correct values. - final String out = - new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in instanceof InstallSnapshotMessages.InstallSnapshot) { - InstallSnapshot is = (InstallSnapshot) - SerializationUtils.fromSerializable(in); - if (is.getData() == null) { - return "InstallSnapshot data is null"; - } - if (is.getLastIncludedIndex() != snapshotIndex) { - return is.getLastIncludedIndex() + "!=" + snapshotIndex; - } - if (is.getLastIncludedTerm() != snapshotTerm) { - return is.getLastIncludedTerm() + "!=" + snapshotTerm; - } - if (is.getTerm() == currentTerm) { - return is.getTerm() + "!=" + currentTerm; - } - - return "match"; - - } else { - return "message mismatch:" + in.getClass(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - }}; + // FIXME - we don't set the term in the serialized message. + //assertEquals(currentTerm, installSnapshot.getTerm()); } @Test - public void testHandleInstallSnapshotReplyLastChunk() { - new JavaTestKit(getSystem()) {{ + public void testHandleInstallSnapshotReplyLastChunk() throws Exception { + logStart("testHandleInstallSnapshotReplyLastChunk"); - ActorRef followerActor = getTestActor(); + MockRaftActorContext actorContext = createActorContextWithFollower(); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); - - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; - - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); - actorContext.setPeerAddresses(peerAddresses); - actorContext.setCommitIndex(followersLastIndex); - - MockLeader leader = new MockLeader(actorContext); - - // Ignore initial heartbeat. - expectMsgClass(duration("5 seconds"), AppendEntries.class); - - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - - // set the snapshot variables in replicatedlog - - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - - ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); - leader.createFollowerToSnapshot(followerActor.path().toString(), bs); - while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) { - leader.getFollowerToSnapshot().getNextChunk(); - leader.getFollowerToSnapshot().incrementChunkIndex(); - } + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + actorContext.setCommitIndex(followersLastIndex); - RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, - new InstallSnapshotReply(currentTerm, followerActor.path().toString(), - leader.getFollowerToSnapshot().getChunkIndex(), true)); + leader = new Leader(actorContext); - assertTrue(raftBehavior instanceof Leader); + // Ignore initial heartbeat. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - assertEquals(0, leader.followerSnapshotSize()); - assertEquals(1, leader.followerLogSize()); - assertNotNull(leader.getFollower(followerActor.path().toString())); - FollowerLogInformation fli = leader.getFollower(followerActor.path().toString()); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex + 1, fli.getNextIndex()); - }}; - } - @Test - public void testSendSnapshotfromInstallSnapshotReply() throws Exception { - new JavaTestKit(getSystem()) {{ + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - TestActorRef followerActor = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply"); + // set the snapshot variables in replicatedlog - Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower-reply", - followerActor.path().toString()); - - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int snapshotTerm = 1; - final int currentTerm = 2; - - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){ - @Override - public int getSnapshotChunkSize() { - return 50; - } - }; - configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); - configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); + leader.setFollowerSnapshot(FOLLOWER_ID, fts); + while(!fts.isLastChunk(fts.getChunkIndex())) { + fts.getNextChunk(); + fts.incrementChunkIndex(); + } + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); - actorContext.setConfigParams(configParams); - actorContext.setPeerAddresses(peerAddresses); - actorContext.setCommitIndex(followersLastIndex); + RaftActorBehavior raftBehavior = leader.handleMessage(followerActor, + new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true)); - MockLeader leader = new MockLeader(actorContext); + assertTrue(raftBehavior instanceof Leader); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + assertEquals(0, leader.followerSnapshotSize()); + assertEquals(1, leader.followerLogSize()); + FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID); + assertNotNull(fli); + assertEquals(snapshotIndex, fli.getMatchIndex()); + assertEquals(snapshotIndex, fli.getMatchIndex()); + assertEquals(snapshotIndex + 1, fli.getNextIndex()); + } + + @Test + public void testSendSnapshotfromInstallSnapshotReply() throws Exception { + logStart("testSendSnapshotfromInstallSnapshotReply"); - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + MockRaftActorContext actorContext = createActorContextWithFollower(); - ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){ + @Override + public int getSnapshotChunkSize() { + return 50; + } + }; + configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); - List objectList = MessageCollectorActor.getAllMatching(followerActor, - InstallSnapshotMessages.InstallSnapshot.class); + actorContext.setConfigParams(configParams); + actorContext.setCommitIndex(followersLastIndex); - assertEquals(1, objectList.size()); + leader = new Leader(actorContext); - Object o = objectList.get(0); - assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - assertEquals(1, installSnapshot.getChunkIndex()); - assertEquals(3, installSnapshot.getTotalChunks()); + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); - leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), - "follower-reply", installSnapshot.getChunkIndex(), true)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - objectList = MessageCollectorActor.getAllMatching(followerActor, - InstallSnapshotMessages.InstallSnapshot.class); + InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); - assertEquals(2, objectList.size()); + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); - installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1); + followerActor.underlyingActor().clear(); + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + FOLLOWER_ID, installSnapshot.getChunkIndex(), true)); - leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), - "follower-reply", installSnapshot.getChunkIndex(), true)); + installSnapshot = MessageCollectorActor.expectFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); - objectList = MessageCollectorActor.getAllMatching(followerActor, - InstallSnapshotMessages.InstallSnapshot.class); + assertEquals(2, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(3, objectList.size()); + followerActor.underlyingActor().clear(); + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + FOLLOWER_ID, installSnapshot.getChunkIndex(), true)); - installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2); + installSnapshot = MessageCollectorActor.expectFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); - // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower - leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), - "follower-reply", installSnapshot.getChunkIndex(), true)); + // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower + followerActor.underlyingActor().clear(); + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + FOLLOWER_ID, installSnapshot.getChunkIndex(), true)); - objectList = MessageCollectorActor.getAllMatching(followerActor, - InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.getFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); - // Count should still stay at 3 - assertEquals(3, objectList.size()); - }}; + Assert.assertNull(installSnapshot); } @Test public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{ - new JavaTestKit(getSystem()) {{ - - TestActorRef followerActor = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower"); - - Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex"); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int snapshotTerm = 1; - final int currentTerm = 2; + MockRaftActorContext actorContext = createActorContextWithFollower(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; - actorContext.setConfigParams(new DefaultConfigParamsImpl(){ - @Override - public int getSnapshotChunkSize() { - return 50; - } - }); - actorContext.setPeerAddresses(peerAddresses); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setConfigParams(new DefaultConfigParamsImpl(){ + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); - MockLeader leader = new MockLeader(actorContext); + actorContext.setCommitIndex(followersLastIndex); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + leader = new Leader(actorContext); - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); - MessageCollectorActor.getAllMatching(followerActor, - InstallSnapshotMessages.InstallSnapshot.class); + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); - assertNotNull(installSnapshot); + InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); - assertEquals(1, installSnapshot.getChunkIndex()); - assertEquals(3, installSnapshot.getTotalChunks()); + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); - followerActor.underlyingActor().clear(); + followerActor.underlyingActor().clear(); - leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), - followerActor.path().toString(), -1, false)); + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + FOLLOWER_ID, -1, false)); - Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); - - installSnapshot = MessageCollectorActor.getFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); - assertNotNull(installSnapshot); + leader.handleMessage(leaderActor, new SendHeartBeat()); - assertEquals(1, installSnapshot.getChunkIndex()); - assertEquals(3, installSnapshot.getTotalChunks()); + installSnapshot = MessageCollectorActor.expectFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); - followerActor.tell(PoisonPill.getInstance(), getRef()); - }}; + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); } @Test public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception { - new JavaTestKit(getSystem()) { - { - TestActorRef followerActor = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk"); - - Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk"); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int snapshotTerm = 1; - final int currentTerm = 2; + MockRaftActorContext actorContext = createActorContextWithFollower(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; - actorContext.setConfigParams(new DefaultConfigParamsImpl() { - @Override - public int getSnapshotChunkSize() { - return 50; - } - }); - actorContext.setPeerAddresses(peerAddresses); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); - MockLeader leader = new MockLeader(actorContext); + actorContext.setCommitIndex(followersLastIndex); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + leader = new Leader(actorContext); - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); - InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); - assertNotNull(installSnapshot); + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - assertEquals(1, installSnapshot.getChunkIndex()); - assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode()); + InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); - int hashCode = installSnapshot.getData().hashCode(); + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode()); - followerActor.underlyingActor().clear(); + int hashCode = installSnapshot.getData().hashCode(); - leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true )); + followerActor.underlyingActor().clear(); - installSnapshot = MessageCollectorActor.getFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); - assertNotNull(installSnapshot); + leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(), + FOLLOWER_ID, 1, true)); - assertEquals(2, installSnapshot.getChunkIndex()); - assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(hashCode, installSnapshot.getLastChunkHashCode()); + installSnapshot = MessageCollectorActor.expectFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); - followerActor.tell(PoisonPill.getInstance(), getRef()); - }}; + assertEquals(2, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + assertEquals(hashCode, installSnapshot.getLastChunkHashCode()); } @Test public void testFollowerToSnapshotLogic() { + logStart("testFollowerToSnapshotLogic"); - MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); + MockRaftActorContext actorContext = createActorContext(); actorContext.setConfigParams(new DefaultConfigParamsImpl() { @Override @@ -847,7 +698,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } }); - MockLeader leader = new MockLeader(actorContext); + leader = new Leader(actorContext); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -857,7 +708,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { ByteString bs = toByteString(leadersSnapshot); byte[] barray = bs.toByteArray(); - leader.createFollowerToSnapshot("followerId", bs); + FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); + leader.setFollowerSnapshot(FOLLOWER_ID, fts); + assertEquals(bs.size(), barray.length); int chunkIndex=0; @@ -869,35 +722,46 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { j = barray.length; } - ByteString chunk = leader.getFollowerToSnapshot().getNextChunk(); + ByteString chunk = fts.getNextChunk(); assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size()); - assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex()); + assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex()); - leader.getFollowerToSnapshot().markSendStatus(true); - if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) { - leader.getFollowerToSnapshot().incrementChunkIndex(); + fts.markSendStatus(true); + if (!fts.isLastChunk(chunkIndex)) { + fts.incrementChunkIndex(); } } - assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks()); + assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); } - @Override protected RaftActorBehavior createBehavior( RaftActorContext actorContext) { return new Leader(actorContext); } - @Override protected RaftActorContext createActorContext() { + @Override + protected MockRaftActorContext createActorContext() { return createActorContext(leaderActor); } @Override - protected RaftActorContext createActorContext(ActorRef actorRef) { + protected MockRaftActorContext createActorContext(ActorRef actorRef) { + return createActorContext("leader", actorRef); + } + + private MockRaftActorContext createActorContextWithFollower() { + MockRaftActorContext actorContext = createActorContext(); + actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, + followerActor.path().toString()).build()); + return actorContext; + } + + private MockRaftActorContext createActorContext(String id, ActorRef actorRef) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); - MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef); + MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef); context.setConfigParams(configParams); return context; } @@ -945,310 +809,248 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { - new JavaTestKit(getSystem()) {{ - TestActorRef leaderActor = TestActorRef.create(getSystem(), - Props.create(ForwardMessageToBehaviorActor.class)); + logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); - MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); - TestActorRef followerActor = TestActorRef.create(getSystem(), - ForwardMessageToBehaviorActor.props()); + MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); - MockRaftActorContext followerActorContext = - new MockRaftActorContext("follower", getSystem(), followerActor); + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().behavior = follower; - Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + Map peerAddresses = new HashMap<>(); + peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); - Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower", followerActor.path().toString()); + leaderActorContext.setPeerAddresses(peerAddresses); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.getReplicatedLog().removeFrom(0); - leaderActorContext.getReplicatedLog().removeFrom(0); + //create 3 entries + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - //create 3 entries - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + leaderActorContext.setCommitIndex(1); - leaderActorContext.setCommitIndex(1); + followerActorContext.getReplicatedLog().removeFrom(0); - followerActorContext.getReplicatedLog().removeFrom(0); + // follower too has the exact same log entries and has the same commit index + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - // follower too has the exact same log entries and has the same commit index - followerActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + followerActorContext.setCommitIndex(1); - followerActorContext.setCommitIndex(1); + leader = new Leader(leaderActorContext); - Leader leader = new Leader(leaderActorContext); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); - assertNotNull(appendEntries); + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(0, appendEntries.getEntries().size()); + assertEquals(0, appendEntries.getPrevLogIndex()); - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(0, appendEntries.getEntries().size()); - assertEquals(0, appendEntries.getPrevLogIndex()); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching( + leaderActor, AppendEntriesReply.class); - AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching( - leaderActor, AppendEntriesReply.class); - assertNotNull(appendEntriesReply); + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); - assertEquals(2, appendEntriesReply.getLogLastIndex()); - assertEquals(1, appendEntriesReply.getLogLastTerm()); + // follower returns its next index + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); - // follower returns its next index - assertEquals(2, appendEntriesReply.getLogLastIndex()); - assertEquals(1, appendEntriesReply.getLogLastTerm()); - }}; + follower.close(); } - @Test public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception { - new JavaTestKit(getSystem()) {{ - TestActorRef leaderActor = TestActorRef.create(getSystem(), - Props.create(ForwardMessageToBehaviorActor.class)); + logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex"); - MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + MockRaftActorContext leaderActorContext = createActorContext(); - TestActorRef followerActor = TestActorRef.create(getSystem(), - ForwardMessageToBehaviorActor.props()); + MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); - MockRaftActorContext followerActorContext = - new MockRaftActorContext("follower", getSystem(), followerActor); + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().behavior = follower; - Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + Map peerAddresses = new HashMap<>(); + peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); - Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower", followerActor.path().toString()); + leaderActorContext.setPeerAddresses(peerAddresses); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.getReplicatedLog().removeFrom(0); - leaderActorContext.getReplicatedLog().removeFrom(0); + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + leaderActorContext.setCommitIndex(1); - leaderActorContext.setCommitIndex(1); + followerActorContext.getReplicatedLog().removeFrom(0); - followerActorContext.getReplicatedLog().removeFrom(0); + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - followerActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + // follower has the same log entries but its commit index > leaders commit index + followerActorContext.setCommitIndex(2); - // follower has the same log entries but its commit index > leaders commit index - followerActorContext.setCommitIndex(2); + leader = new Leader(leaderActorContext); - Leader leader = new Leader(leaderActorContext); + // Initial heartbeat + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // Initial heartbeat - AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); - assertNotNull(appendEntries); + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(0, appendEntries.getEntries().size()); + assertEquals(0, appendEntries.getPrevLogIndex()); - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(0, appendEntries.getEntries().size()); - assertEquals(0, appendEntries.getPrevLogIndex()); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching( + leaderActor, AppendEntriesReply.class); - AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching( - leaderActor, AppendEntriesReply.class); - assertNotNull(appendEntriesReply); + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); - assertEquals(2, appendEntriesReply.getLogLastIndex()); - assertEquals(1, appendEntriesReply.getLogLastTerm()); + leaderActor.underlyingActor().behavior = leader; + leader.handleMessage(followerActor, appendEntriesReply); - leaderActor.underlyingActor().behavior = leader; - leader.handleMessage(followerActor, appendEntriesReply); + leaderActor.underlyingActor().clear(); + followerActor.underlyingActor().clear(); - leaderActor.underlyingActor().clear(); - followerActor.underlyingActor().clear(); + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); - Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); - leader.handleMessage(leaderActor, new SendHeartBeat()); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); - assertNotNull(appendEntries); + assertEquals(2, appendEntries.getLeaderCommit()); + assertEquals(0, appendEntries.getEntries().size()); + assertEquals(2, appendEntries.getPrevLogIndex()); - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(0, appendEntries.getEntries().size()); - assertEquals(2, appendEntries.getPrevLogIndex()); + appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class); - assertNotNull(appendEntriesReply); + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); - assertEquals(2, appendEntriesReply.getLogLastIndex()); - assertEquals(1, appendEntriesReply.getLogLastTerm()); + assertEquals(2, followerActorContext.getCommitIndex()); - assertEquals(1, followerActorContext.getCommitIndex()); - }}; + follower.close(); } @Test public void testHandleAppendEntriesReplyFailure(){ - new JavaTestKit(getSystem()) { - { + logStart("testHandleAppendEntriesReplyFailure"); - ActorRef leaderActor = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); - ActorRef followerActor = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); + leader = new Leader(leaderActorContext); + // Send initial heartbeat reply with last index. + leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1)); - MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 11, followerInfo.getNextIndex()); - Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower-1", - followerActor.path().toString()); + AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1); - leaderActorContext.setPeerAddresses(peerAddresses); + RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); - Leader leader = new Leader(leaderActorContext); + assertEquals(RaftState.Leader, raftActorBehavior.state()); - AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1); - - RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); - - assertEquals(RaftState.Leader, raftActorBehavior.state()); - - }}; + assertEquals("getNextIndex", 10, followerInfo.getNextIndex()); } @Test public void testHandleAppendEntriesReplySuccess() throws Exception { - new JavaTestKit(getSystem()) { - { - - ActorRef leaderActor = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); - - ActorRef followerActor = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); - + logStart("testHandleAppendEntriesReplySuccess"); - MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower-1", - followerActor.path().toString()); + leaderActorContext.setCommitIndex(1); + leaderActorContext.setLastApplied(1); + leaderActorContext.getTermInformation().update(1, "leader"); - leaderActorContext.setPeerAddresses(peerAddresses); - leaderActorContext.setCommitIndex(1); - leaderActorContext.setLastApplied(1); - leaderActorContext.getTermInformation().update(1, "leader"); + leader = new Leader(leaderActorContext); - Leader leader = new Leader(leaderActorContext); + AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1); - AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1); + RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); - RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); + assertEquals(RaftState.Leader, raftActorBehavior.state()); - assertEquals(RaftState.Leader, raftActorBehavior.state()); + assertEquals(2, leaderActorContext.getCommitIndex()); - assertEquals(2, leaderActorContext.getCommitIndex()); + ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching( + leaderActor, ApplyLogEntries.class); - ApplyLogEntries applyLogEntries = - MessageCollectorActor.getFirstMatching(leaderActor, - ApplyLogEntries.class); + assertEquals(2, leaderActorContext.getLastApplied()); - assertNotNull(applyLogEntries); + assertEquals(2, applyLogEntries.getToIndex()); - assertEquals(2, leaderActorContext.getLastApplied()); + List applyStateList = MessageCollectorActor.getAllMatching(leaderActor, + ApplyState.class); - assertEquals(2, applyLogEntries.getToIndex()); + assertEquals(1,applyStateList.size()); - List applyStateList = MessageCollectorActor.getAllMatching(leaderActor, - ApplyState.class); + ApplyState applyState = applyStateList.get(0); - assertEquals(1,applyStateList.size()); - - ApplyState applyState = (ApplyState) applyStateList.get(0); - - assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); - - }}; + assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); } @Test public void testHandleAppendEntriesReplyUnknownFollower(){ - new JavaTestKit(getSystem()) { - { + logStart("testHandleAppendEntriesReplyUnknownFollower"); - ActorRef leaderActor = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); + MockRaftActorContext leaderActorContext = createActorContext(); - MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + leader = new Leader(leaderActorContext); - Leader leader = new Leader(leaderActorContext); + AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1); - AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1); + RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); - RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply); - - assertEquals(RaftState.Leader, raftActorBehavior.state()); - - }}; + assertEquals(RaftState.Leader, raftActorBehavior.state()); } @Test public void testHandleRequestVoteReply(){ - new JavaTestKit(getSystem()) { - { - - ActorRef leaderActor = - getSystem().actorOf(Props.create(MessageCollectorActor.class)); + logStart("testHandleRequestVoteReply"); - MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + MockRaftActorContext leaderActorContext = createActorContext(); - Leader leader = new Leader(leaderActorContext); + leader = new Leader(leaderActorContext); - RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true)); + // Should be a no-op. + RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor, + new RequestVoteReply(1, true)); - assertEquals(RaftState.Leader, raftActorBehavior.state()); + assertEquals(RaftState.Leader, raftActorBehavior.state()); - raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false)); + raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false)); - assertEquals(RaftState.Leader, raftActorBehavior.state()); - }}; + assertEquals(RaftState.Leader, raftActorBehavior.state()); } @Test public void testIsolatedLeaderCheckNoFollowers() { - new JavaTestKit(getSystem()) {{ - ActorRef leaderActor = getTestActor(); - - MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + logStart("testIsolatedLeaderCheckNoFollowers"); - Map peerAddresses = new HashMap<>(); - leaderActorContext.setPeerAddresses(peerAddresses); + MockRaftActorContext leaderActorContext = createActorContext(); - Leader leader = new Leader(leaderActorContext); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue(behavior instanceof Leader); - }}; + leader = new Leader(leaderActorContext); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue(behavior instanceof Leader); } @Test public void testIsolatedLeaderCheckTwoFollowers() throws Exception { + logStart("testIsolatedLeaderCheckTwoFollowers"); + new JavaTestKit(getSystem()) {{ ActorRef followerActor1 = getTestActor(); ActorRef followerActor2 = getTestActor(); - MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext(); + MockRaftActorContext leaderActorContext = createActorContext(); Map peerAddresses = new HashMap<>(); peerAddresses.put("follower-1", followerActor1.path().toString()); @@ -1256,7 +1058,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leaderActorContext.setPeerAddresses(peerAddresses); - Leader leader = new Leader(leaderActorContext); + leader = new Leader(leaderActorContext); leader.stopIsolatedLeaderCheckSchedule(); leader.markFollowerActive("follower-1"); @@ -1289,118 +1091,83 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", behavior instanceof IsolatedLeader); - }}; } @Test public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception { - new JavaTestKit(getSystem()) {{ - TestActorRef leaderActor = TestActorRef.create(getSystem(), - Props.create(MessageCollectorActor.class)); - - MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); - - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); - configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); - - leaderActorContext.setConfigParams(configParams); + logStart("testAppendEntryCallAtEndofAppendEntryReply"); - TestActorRef followerActor = TestActorRef.create(getSystem(), - ForwardMessageToBehaviorActor.props()); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); - MockRaftActorContext followerActorContext = - new MockRaftActorContext("follower-reply", getSystem(), followerActor); - - followerActorContext.setConfigParams(configParams); - - Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; - - Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower-reply", - followerActor.path().toString()); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setConfigParams(configParams); - leaderActorContext.getReplicatedLog().removeFrom(0); - leaderActorContext.setCommitIndex(-1); - leaderActorContext.setLastApplied(-1); + MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); - followerActorContext.getReplicatedLog().removeFrom(0); - followerActorContext.setCommitIndex(-1); - followerActorContext.setLastApplied(-1); + followerActorContext.setConfigParams(configParams); - Leader leader = new Leader(leaderActorContext); + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().behavior = follower; - AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching( - leaderActor, AppendEntriesReply.class); - assertNotNull(appendEntriesReply); - System.out.println("appendEntriesReply: "+appendEntriesReply); - leader.handleMessage(followerActor, appendEntriesReply); + leaderActorContext.getReplicatedLog().removeFrom(0); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); - // Clear initial heartbeat messages + followerActorContext.getReplicatedLog().removeFrom(0); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); - leaderActor.underlyingActor().clear(); - followerActor.underlyingActor().clear(); + leader = new Leader(leaderActorContext); - // create 3 entries - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - leaderActorContext.setCommitIndex(1); - leaderActorContext.setLastApplied(1); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching( + leaderActor, AppendEntriesReply.class); - Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); + leader.handleMessage(followerActor, appendEntriesReply); - leader.handleMessage(leaderActor, new SendHeartBeat()); + // Clear initial heartbeat messages - AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); - assertNotNull(appendEntries); + leaderActor.underlyingActor().clear(); + followerActor.underlyingActor().clear(); - // Should send first log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(0, appendEntries.getEntries().get(0).getIndex()); - assertEquals(-1, appendEntries.getPrevLogIndex()); + // create 3 entries + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + leaderActorContext.setCommitIndex(1); + leaderActorContext.setLastApplied(1); - appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class); - assertNotNull(appendEntriesReply); + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); - assertEquals(1, appendEntriesReply.getLogLastTerm()); - assertEquals(0, appendEntriesReply.getLogLastIndex()); + leader.handleMessage(leaderActor, new SendHeartBeat()); - followerActor.underlyingActor().clear(); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - leader.handleAppendEntriesReply(followerActor, appendEntriesReply); + // Should send first log entry + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(0, appendEntries.getEntries().get(0).getIndex()); + assertEquals(-1, appendEntries.getPrevLogIndex()); - appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); - assertNotNull(appendEntries); + appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - // Should send second log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getEntries().get(0).getIndex()); - }}; - } + assertEquals(1, appendEntriesReply.getLogLastTerm()); + assertEquals(0, appendEntriesReply.getLogLastIndex()); - class MockLeader extends Leader { + followerActor.underlyingActor().clear(); - FollowerToSnapshot fts; + leader.handleAppendEntriesReply(followerActor, appendEntriesReply); - public MockLeader(RaftActorContext context){ - super(context); - } + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - public FollowerToSnapshot getFollowerToSnapshot() { - return fts; - } + // Should send second log entry + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(1, appendEntries.getEntries().get(0).getIndex()); - public void createFollowerToSnapshot(String followerId, ByteString bs ) { - fts = new FollowerToSnapshot(bs); - setFollowerSnapshot(followerId, fts); - } + follower.close(); } private class MockConfigParamsImpl extends DefaultConfigParamsImpl { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index 79c90cf051..448c28e8c8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.junit.Assert; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -63,29 +64,39 @@ public class MessageCollectorActor extends UntypedActor { * @return */ public static T getFirstMatching(ActorRef actor, Class clazz) throws Exception { - for(int i = 0; i < 50; i++) { - List allMessages = getAllMessages(actor); + List allMessages = getAllMessages(actor); - for(Object message : allMessages){ - if(message.getClass().equals(clazz)){ - return (T) message; - } + for(Object message : allMessages){ + if(message.getClass().equals(clazz)){ + return (T) message; + } + } + + return null; + } + + public static T expectFirstMatching(ActorRef actor, Class clazz) throws Exception { + for(int i = 0; i < 50; i++) { + T message = getFirstMatching(actor, clazz); + if(message != null) { + return message; } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } + Assert.fail("Did not receive message of type " + clazz); return null; } - public static List getAllMatching(ActorRef actor, Class clazz) throws Exception { + public static List getAllMatching(ActorRef actor, Class clazz) throws Exception { List allMessages = getAllMessages(actor); - List output = Lists.newArrayList(); + List output = Lists.newArrayList(); for(Object message : allMessages){ if(message.getClass().equals(clazz)){ - output.add(message); + output.add((T) message); } }