X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=ba0bd0f29c96237ab758487b719eac959d115edc;hb=95115ca49f3b16b936e0f6c88aedfc17cd0ee92c;hp=3f085df8dc3b858879981c27e223262f2a5bc40f;hpb=753515e8868a1a15982d3f2697439f522f273db5;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 3f085df8dc..ba0bd0f29c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -48,6 +48,7 @@ import scala.concurrent.duration.FiniteDuration; public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; + public static final String LEADER_ID = "leader"; private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader")); @@ -119,6 +120,15 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){ + MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); + MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( + 1, index, payload); + actorContext.getReplicatedLog().append(newEntry); + return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); + } + @Test public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); @@ -145,8 +155,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( 1, lastIndex + 1, payload); actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(null, null, newEntry)); + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -160,6 +169,218 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); } + @Test + public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception { + logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<5;i++) { + sendReplicate(actorContext, lastIndex+i+1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect only 1 message to be sent because of two reasons, + // - an append entries reply was not received + // - the heartbeat interval has not expired + // In this scenario if multiple messages are sent they would likely be duplicates + assertEquals("The number of append entries collected should be 1", 1, allMessages.size()); + } + + @Test + public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception { + logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<3;i++) { + sendReplicate(actorContext, lastIndex+i+1); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex + i + 1, term)); + + } + + for(int i=3;i<5;i++) { + sendReplicate(actorContext, lastIndex + i + 1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would + // get sent to the follower - but not the 5th + assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); + + for(int i=0;i<4;i++) { + long expected = allMessages.get(i).getEntries().get(0).getIndex(); + assertEquals(expected, i+2); + } + } + + @Test + public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception { + logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(500, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + sendReplicate(actorContext, lastIndex+1); + + // Wait slightly longer than heartbeat duration + Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(1, allMessages.get(0).getEntries().size()); + assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + assertEquals(1, allMessages.get(1).getEntries().size()); + assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + + } + + @Test + public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception { + logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<3;i++) { + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 3", 3, allMessages.size()); + } + + @Test + public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception { + logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + sendReplicate(actorContext, lastIndex+1); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(0, allMessages.get(0).getEntries().size()); + assertEquals(1, allMessages.get(1).getEntries().size()); + } + + @Test public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception { logStart("testHandleReplicateMessageWhenThereAreNoFollowers"); @@ -304,6 +525,8 @@ public class LeaderTest extends AbstractLeaderTest { new ReplicatedLogImplEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); + actorContext.getReplicatedLog().append(entry); + //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); @@ -726,7 +949,7 @@ public class LeaderTest extends AbstractLeaderTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef) { - return createActorContext("leader", actorRef); + return createActorContext(LEADER_ID, actorRef); } private MockRaftActorContext createActorContextWithFollower() { @@ -805,14 +1028,15 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContext(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); + Map leaderPeerAddresses = new HashMap<>(); + leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setPeerAddresses(leaderPeerAddresses); leaderActorContext.getReplicatedLog().removeFrom(0); @@ -1047,6 +1271,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); followerActorContext.setConfigParams(configParams); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower);