X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=0255020328655dfe0dee151207f66f9a51eddcc5;hp=f087793674ec7f304192ec55871379f1adbfc03b;hb=4a60a637941ccf77ab7b32484cbc4128eaf3ea7c;hpb=03e752cbd625921ece92c5281cd4e1a8c81b3210 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index f087793674..0255020328 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -13,9 +13,6 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,8 +27,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; -import org.opendaylight.controller.cluster.raft.TestActorFactory; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; @@ -43,17 +39,16 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; -import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractRaftActorBehaviorTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; - - private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + public static final String LEADER_ID = "leader"; private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader")); @@ -63,17 +58,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { private Leader leader; + @Override @After public void tearDown() throws Exception { if(leader != null) { leader.close(); } - actorFactory.close(); - } - - private void logStart(String name) { - LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name); + super.tearDown(); } @Test @@ -128,6 +120,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){ + MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); + MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( + 1, index, payload); + actorContext.getReplicatedLog().append(newEntry); + return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); + } + @Test public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); @@ -154,8 +155,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( 1, lastIndex + 1, payload); actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(null, null, newEntry)); + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -169,6 +169,218 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); } + @Test + public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception { + logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<5;i++) { + sendReplicate(actorContext, lastIndex+i+1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect only 1 message to be sent because of two reasons, + // - an append entries reply was not received + // - the heartbeat interval has not expired + // In this scenario if multiple messages are sent they would likely be duplicates + assertEquals("The number of append entries collected should be 1", 1, allMessages.size()); + } + + @Test + public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception { + logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<3;i++) { + sendReplicate(actorContext, lastIndex+i+1); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex + i + 1, term)); + + } + + for(int i=3;i<5;i++) { + sendReplicate(actorContext, lastIndex + i + 1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would + // get sent to the follower - but not the 5th + assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); + + for(int i=0;i<4;i++) { + long expected = allMessages.get(i).getEntries().get(0).getIndex(); + assertEquals(expected, i+2); + } + } + + @Test + public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception { + logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(500, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + sendReplicate(actorContext, lastIndex+1); + + // Wait slightly longer than heartbeat duration + Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(1, allMessages.get(0).getEntries().size()); + assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + assertEquals(1, allMessages.get(1).getEntries().size()); + assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + + } + + @Test + public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception { + logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<3;i++) { + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 3", 3, allMessages.size()); + } + + @Test + public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception { + logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + sendReplicate(actorContext, lastIndex+1); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(0, allMessages.get(0).getEntries().size()); + assertEquals(1, allMessages.get(1).getEntries().size()); + } + + @Test public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception { logStart("testHandleReplicateMessageWhenThereAreNoFollowers"); @@ -273,10 +485,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(leaderActor, new SendHeartBeat()); - InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor, - InstallSnapshot.SERIALIZABLE_CLASS); - - InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto); + InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(snapshotIndex, is.getLastIncludedIndex()); } @@ -316,6 +525,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { new ReplicatedLogImplEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); + actorContext.getReplicatedLog().append(entry); + //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); @@ -325,7 +536,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertTrue(raftBehavior instanceof Leader); - MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); } @Test @@ -373,7 +584,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); + + CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); @@ -384,8 +597,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { // if an initiate is started again when first is in progress, it shouldnt initiate Capture leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - List captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class); - assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size()); + Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @Test @@ -425,15 +637,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { // check if installsnapshot gets called with the correct values. - InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable( - MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class)); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertNotNull(installSnapshot.getData()); assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex()); assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); - // FIXME - we don't set the term in the serialized message. - //assertEquals(currentTerm, installSnapshot.getTerm()); + assertEquals(currentTerm, installSnapshot.getTerm()); } @Test @@ -531,8 +741,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -541,8 +750,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), FOLLOWER_ID, installSnapshot.getChunkIndex(), true)); - installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(2, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -551,16 +759,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), FOLLOWER_ID, installSnapshot.getChunkIndex(), true)); - installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower followerActor.underlyingActor().clear(); leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), FOLLOWER_ID, installSnapshot.getChunkIndex(), true)); - installSnapshot = MessageCollectorActor.getFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class); Assert.assertNull(installSnapshot); } @@ -601,10 +807,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { ByteString bs = toByteString(leadersSnapshot); leader.setSnapshot(Optional.of(bs)); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -619,8 +825,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(leaderActor, new SendHeartBeat()); - installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -663,12 +868,11 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode()); + assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue()); int hashCode = installSnapshot.getData().hashCode(); @@ -677,12 +881,11 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(), FOLLOWER_ID, 1, true)); - installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(2, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(hashCode, installSnapshot.getLastChunkHashCode()); + assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue()); } @Test @@ -747,12 +950,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef) { - return createActorContext("leader", actorRef); + return createActorContext(LEADER_ID, actorRef); } private MockRaftActorContext createActorContextWithFollower() { MockRaftActorContext actorContext = createActorContext(); - actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, + actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, followerActor.path().toString()).build()); return actorContext; } @@ -766,47 +969,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { return context; } - private ByteString toByteString(Map state) { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(state); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } - } - } catch (IOException e) { - Assert.fail("IOException in converting Hashmap to Bytestring:" + e); - } - return null; - } - - public static class ForwardMessageToBehaviorActor extends MessageCollectorActor { - AbstractRaftActorBehavior behavior; - - @Override public void onReceive(Object message) throws Exception { - if(behavior != null) { - behavior.handleMessage(sender(), message); - } - - super.onReceive(message); - } - - public static Props props() { - return Props.create(ForwardMessageToBehaviorActor.class); - } - } - @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); @@ -816,7 +978,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -867,14 +1029,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext leaderActorContext = createActorContext(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); + Map leaderPeerAddresses = new HashMap<>(); + leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setPeerAddresses(leaderPeerAddresses); leaderActorContext.getReplicatedLog().removeFrom(0); @@ -906,7 +1069,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(2, appendEntriesReply.getLogLastIndex()); assertEquals(1, appendEntriesReply.getLogLastTerm()); - leaderActor.underlyingActor().behavior = leader; + leaderActor.underlyingActor().setBehavior(follower); leader.handleMessage(followerActor, appendEntriesReply); leaderActor.underlyingActor().clear(); @@ -979,12 +1142,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(2, leaderActorContext.getCommitIndex()); - ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching( - leaderActor, ApplyLogEntries.class); + ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching( + leaderActor, ApplyJournalEntries.class); assertEquals(2, leaderActorContext.getLastApplied()); - assertEquals(2, applyLogEntries.getToIndex()); + assertEquals(2, applyJournalEntries.getToIndex()); List applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class); @@ -1059,7 +1222,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leaderActorContext.setPeerAddresses(peerAddresses); leader = new Leader(leaderActorContext); - leader.stopIsolatedLeaderCheckSchedule(); leader.markFollowerActive("follower-1"); leader.markFollowerActive("follower-2"); @@ -1110,9 +1272,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); followerActorContext.setConfigParams(configParams); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); leaderActorContext.getReplicatedLog().removeFrom(0); leaderActorContext.setCommitIndex(-1); @@ -1170,6 +1333,73 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { follower.close(); } + @Test + public void testLaggingFollowerStarvation() throws Exception { + logStart("testLaggingFollowerStarvation"); + new JavaTestKit(getSystem()) {{ + String leaderActorId = actorFactory.generateActorId("leader"); + String follower1ActorId = actorFactory.generateActorId("follower"); + String follower2ActorId = actorFactory.generateActorId("follower"); + + TestActorRef leaderActor = + actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId); + ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId); + ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext(leaderActorId, getSystem(), leaderActor); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + + leaderActorContext.setConfigParams(configParams); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build()); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1ActorId, + follower1Actor.path().toString()); + peerAddresses.put(follower2ActorId, + follower2Actor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.getTermInformation().update(1, leaderActorId); + + RaftActorBehavior leader = createBehavior(leaderActorContext); + + leaderActor.underlyingActor().setBehavior(leader); + + for(int i=1;i<6;i++) { + // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) + RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1)); + assertTrue(newBehavior == leader); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + + // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply + List heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class); + + assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()), + heartbeats.size() > 1); + + // Check if follower-2 got AppendEntries during this time and was not starved + List appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class); + + assertTrue(String.format("%s append entries is less than expected", appendEntries.size()), + appendEntries.size() > 1); + + }}; + } + + @Override + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + ActorRef actorRef, RaftRPC rpc) throws Exception { + super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); + assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); + } + private class MockConfigParamsImpl extends DefaultConfigParamsImpl { private final long electionTimeOutIntervalMillis;