X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=d19b669d27532278344ead713f3e7da17dfabeb2;hb=859ad8ac73dab8064013a95ab4962eecbfb6d346;hp=6964db51f273f52ac0591b80cf75edf5997ec68c;hpb=f78b7f15e24efdb5dd9f91b487bc63dad7517b1c;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 6964db51f2..d19b669d27 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -120,6 +120,15 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){ + MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); + MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( + 1, index, payload); + actorContext.getReplicatedLog().append(newEntry); + return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); + } + @Test public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); @@ -146,8 +155,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( 1, lastIndex + 1, payload); actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(null, null, newEntry)); + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -161,6 +169,218 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); } + @Test + public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception { + logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<5;i++) { + sendReplicate(actorContext, lastIndex+i+1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect only 1 message to be sent because of two reasons, + // - an append entries reply was not received + // - the heartbeat interval has not expired + // In this scenario if multiple messages are sent they would likely be duplicates + assertEquals("The number of append entries collected should be 1", 1, allMessages.size()); + } + + @Test + public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception { + logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<3;i++) { + sendReplicate(actorContext, lastIndex+i+1); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex + i + 1, term)); + + } + + for(int i=3;i<5;i++) { + sendReplicate(actorContext, lastIndex + i + 1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would + // get sent to the follower - but not the 5th + assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); + + for(int i=0;i<4;i++) { + long expected = allMessages.get(i).getEntries().get(0).getIndex(); + assertEquals(expected, i+2); + } + } + + @Test + public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception { + logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(500, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + sendReplicate(actorContext, lastIndex+1); + + // Wait slightly longer than heartbeat duration + Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(1, allMessages.get(0).getEntries().size()); + assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + assertEquals(1, allMessages.get(1).getEntries().size()); + assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + + } + + @Test + public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception { + logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<3;i++) { + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 3", 3, allMessages.size()); + } + + @Test + public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception { + logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + sendReplicate(actorContext, lastIndex+1); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(0, allMessages.get(0).getEntries().size()); + assertEquals(1, allMessages.get(1).getEntries().size()); + } + + @Test public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception { logStart("testHandleReplicateMessageWhenThereAreNoFollowers");