X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=c57fce1cd553d8c41dd786d9c4bb6f33e97791b6;hp=34e1ed9b3b3478032c86e9058998ac23610c7c97;hb=5c008222efa5c0af49cf8a52881a6299b1e249dc;hpb=ea3673e89598b896c93ebee864e6cb8db7f6c6ec diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 34e1ed9b3b..c57fce1cd5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -41,11 +41,12 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractRaftActorBehaviorTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; @@ -742,7 +743,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { private MockRaftActorContext createActorContextWithFollower() { MockRaftActorContext actorContext = createActorContext(); - actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, + actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, followerActor.path().toString()).build()); return actorContext; } @@ -756,22 +757,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { return context; } - public static class ForwardMessageToBehaviorActor extends MessageCollectorActor { - AbstractRaftActorBehavior behavior; - - @Override public void onReceive(Object message) throws Exception { - if(behavior != null) { - behavior.handleMessage(sender(), message); - } - - super.onReceive(message); - } - - public static Props props() { - return Props.create(ForwardMessageToBehaviorActor.class); - } - } - @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); @@ -781,7 +766,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -834,7 +819,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -871,7 +856,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(2, appendEntriesReply.getLogLastIndex()); assertEquals(1, appendEntriesReply.getLogLastTerm()); - leaderActor.underlyingActor().behavior = leader; + leaderActor.underlyingActor().setBehavior(follower); leader.handleMessage(followerActor, appendEntriesReply); leaderActor.underlyingActor().clear(); @@ -1076,7 +1061,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { followerActorContext.setConfigParams(configParams); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); leaderActorContext.getReplicatedLog().removeFrom(0); leaderActorContext.setCommitIndex(-1); @@ -1134,6 +1119,66 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { follower.close(); } + @Test + public void testLaggingFollowerStarvation() throws Exception { + logStart("testLaggingFollowerStarvation"); + new JavaTestKit(getSystem()) {{ + String leaderActorId = actorFactory.generateActorId("leader"); + String follower1ActorId = actorFactory.generateActorId("follower"); + String follower2ActorId = actorFactory.generateActorId("follower"); + + TestActorRef leaderActor = + actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId); + ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId); + ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext(leaderActorId, getSystem(), leaderActor); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + + leaderActorContext.setConfigParams(configParams); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build()); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1ActorId, + follower1Actor.path().toString()); + peerAddresses.put(follower2ActorId, + follower2Actor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.getTermInformation().update(1, leaderActorId); + + RaftActorBehavior leader = createBehavior(leaderActorContext); + + leaderActor.underlyingActor().setBehavior(leader); + + for(int i=1;i<6;i++) { + // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) + RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1)); + assertTrue(newBehavior == leader); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + + // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply + List heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class); + + assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()), + heartbeats.size() > 1); + + // Check if follower-2 got AppendEntries during this time and was not starved + List appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class); + + assertTrue(String.format("%s append entries is less than expected", appendEntries.size()), + appendEntries.size() > 1); + + }}; + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception {