X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=c57fce1cd553d8c41dd786d9c4bb6f33e97791b6;hb=45757bbd6c220f0f805715c84025c75a9388770a;hp=f087793674ec7f304192ec55871379f1adbfc03b;hpb=03e752cbd625921ece92c5281cd4e1a8c81b3210;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index f087793674..c57fce1cd5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -13,9 +13,6 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,7 +27,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; -import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -43,18 +39,17 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; -import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractRaftActorBehaviorTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; - private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); - private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader")); @@ -63,17 +58,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { private Leader leader; + @Override @After public void tearDown() throws Exception { if(leader != null) { leader.close(); } - actorFactory.close(); - } - - private void logStart(String name) { - LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name); + super.tearDown(); } @Test @@ -432,8 +424,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex()); assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); - // FIXME - we don't set the term in the serialized message. - //assertEquals(currentTerm, installSnapshot.getTerm()); + assertEquals(currentTerm, installSnapshot.getTerm()); } @Test @@ -752,7 +743,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { private MockRaftActorContext createActorContextWithFollower() { MockRaftActorContext actorContext = createActorContext(); - actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, + actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, followerActor.path().toString()).build()); return actorContext; } @@ -766,47 +757,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { return context; } - private ByteString toByteString(Map state) { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(state); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } - } - } catch (IOException e) { - Assert.fail("IOException in converting Hashmap to Bytestring:" + e); - } - return null; - } - - public static class ForwardMessageToBehaviorActor extends MessageCollectorActor { - AbstractRaftActorBehavior behavior; - - @Override public void onReceive(Object message) throws Exception { - if(behavior != null) { - behavior.handleMessage(sender(), message); - } - - super.onReceive(message); - } - - public static Props props() { - return Props.create(ForwardMessageToBehaviorActor.class); - } - } - @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); @@ -816,7 +766,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -869,7 +819,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -906,7 +856,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(2, appendEntriesReply.getLogLastIndex()); assertEquals(1, appendEntriesReply.getLogLastTerm()); - leaderActor.underlyingActor().behavior = leader; + leaderActor.underlyingActor().setBehavior(follower); leader.handleMessage(followerActor, appendEntriesReply); leaderActor.underlyingActor().clear(); @@ -1059,7 +1009,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leaderActorContext.setPeerAddresses(peerAddresses); leader = new Leader(leaderActorContext); - leader.stopIsolatedLeaderCheckSchedule(); leader.markFollowerActive("follower-1"); leader.markFollowerActive("follower-2"); @@ -1112,7 +1061,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { followerActorContext.setConfigParams(configParams); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); leaderActorContext.getReplicatedLog().removeFrom(0); leaderActorContext.setCommitIndex(-1); @@ -1170,6 +1119,73 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { follower.close(); } + @Test + public void testLaggingFollowerStarvation() throws Exception { + logStart("testLaggingFollowerStarvation"); + new JavaTestKit(getSystem()) {{ + String leaderActorId = actorFactory.generateActorId("leader"); + String follower1ActorId = actorFactory.generateActorId("follower"); + String follower2ActorId = actorFactory.generateActorId("follower"); + + TestActorRef leaderActor = + actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId); + ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId); + ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext(leaderActorId, getSystem(), leaderActor); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + + leaderActorContext.setConfigParams(configParams); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build()); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1ActorId, + follower1Actor.path().toString()); + peerAddresses.put(follower2ActorId, + follower2Actor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.getTermInformation().update(1, leaderActorId); + + RaftActorBehavior leader = createBehavior(leaderActorContext); + + leaderActor.underlyingActor().setBehavior(leader); + + for(int i=1;i<6;i++) { + // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) + RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1)); + assertTrue(newBehavior == leader); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + + // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply + List heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class); + + assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()), + heartbeats.size() > 1); + + // Check if follower-2 got AppendEntries during this time and was not starved + List appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class); + + assertTrue(String.format("%s append entries is less than expected", appendEntries.size()), + appendEntries.size() > 1); + + }}; + } + + @Override + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + ActorRef actorRef, RaftRPC rpc) throws Exception { + super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); + assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); + } + private class MockConfigParamsImpl extends DefaultConfigParamsImpl { private final long electionTimeOutIntervalMillis;