Merge "Fixed discard-changes for mdsal netconf, mapping code cleanup."
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 02c391f146630277c9fc60233193bb8108cd6961..6964db51f273f52ac0591b80cf75edf5997ec68c 100644 (file)
@@ -27,8 +27,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.TestActorFactory;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
@@ -40,17 +39,17 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
 
-public class LeaderTest extends AbstractRaftActorBehaviorTest {
+public class LeaderTest extends AbstractLeaderTest {
 
     static final String FOLLOWER_ID = "follower";
 
-    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
-
     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
 
@@ -59,13 +58,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
     private Leader leader;
 
+    @Override
     @After
     public void tearDown() throws Exception {
         if(leader != null) {
             leader.close();
         }
 
-        actorFactory.close();
+        super.tearDown();
     }
 
     @Test
@@ -743,7 +743,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
     private MockRaftActorContext createActorContextWithFollower() {
         MockRaftActorContext actorContext = createActorContext();
-        actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
+        actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
                 followerActor.path().toString()).build());
         return actorContext;
     }
@@ -757,22 +757,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return context;
     }
 
-    public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
-        AbstractRaftActorBehavior behavior;
-
-        @Override public void onReceive(Object message) throws Exception {
-            if(behavior != null) {
-                behavior.handleMessage(sender(), message);
-            }
-
-            super.onReceive(message);
-        }
-
-        public static Props props() {
-            return Props.create(ForwardMessageToBehaviorActor.class);
-        }
-    }
-
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
@@ -782,7 +766,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
         Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().behavior = follower;
+        followerActor.underlyingActor().setBehavior(follower);
 
         Map<String, String> peerAddresses = new HashMap<>();
         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
@@ -835,7 +819,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
         Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().behavior = follower;
+        followerActor.underlyingActor().setBehavior(follower);
 
         Map<String, String> peerAddresses = new HashMap<>();
         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
@@ -872,7 +856,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         assertEquals(2, appendEntriesReply.getLogLastIndex());
         assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-        leaderActor.underlyingActor().behavior = leader;
+        leaderActor.underlyingActor().setBehavior(follower);
         leader.handleMessage(followerActor, appendEntriesReply);
 
         leaderActor.underlyingActor().clear();
@@ -945,12 +929,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
         assertEquals(2, leaderActorContext.getCommitIndex());
 
-        ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
-                leaderActor, ApplyLogEntries.class);
+        ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
+                leaderActor, ApplyJournalEntries.class);
 
         assertEquals(2, leaderActorContext.getLastApplied());
 
-        assertEquals(2, applyLogEntries.getToIndex());
+        assertEquals(2, applyJournalEntries.getToIndex());
 
         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
                 ApplyState.class);
@@ -1025,7 +1009,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             leaderActorContext.setPeerAddresses(peerAddresses);
 
             leader = new Leader(leaderActorContext);
-            leader.stopIsolatedLeaderCheckSchedule();
 
             leader.markFollowerActive("follower-1");
             leader.markFollowerActive("follower-2");
@@ -1078,7 +1061,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         followerActorContext.setConfigParams(configParams);
 
         Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().behavior = follower;
+        followerActor.underlyingActor().setBehavior(follower);
 
         leaderActorContext.getReplicatedLog().removeFrom(0);
         leaderActorContext.setCommitIndex(-1);
@@ -1136,6 +1119,73 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         follower.close();
     }
 
+    @Test
+    public void testLaggingFollowerStarvation() throws Exception {
+        logStart("testLaggingFollowerStarvation");
+        new JavaTestKit(getSystem()) {{
+            String leaderActorId = actorFactory.generateActorId("leader");
+            String follower1ActorId = actorFactory.generateActorId("follower");
+            String follower2ActorId = actorFactory.generateActorId("follower");
+
+            TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+                    actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+            ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+            ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+            MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            leaderActorContext.setReplicatedLog(
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(follower1ActorId,
+                    follower1Actor.path().toString());
+            peerAddresses.put(follower2ActorId,
+                    follower2Actor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+            leaderActorContext.getTermInformation().update(1, leaderActorId);
+
+            RaftActorBehavior leader = createBehavior(leaderActorContext);
+
+            leaderActor.underlyingActor().setBehavior(leader);
+
+            for(int i=1;i<6;i++) {
+                // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+                assertTrue(newBehavior == leader);
+                Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+            }
+
+            // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+            List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+            assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+                    heartbeats.size() > 1);
+
+            // Check if follower-2 got AppendEntries during this time and was not starved
+            List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+
+            assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+                    appendEntries.size() > 1);
+
+        }};
+    }
+
+    @Override
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+            ActorRef actorRef, RaftRPC rpc) throws Exception {
+        super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
+        assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
+    }
+
     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
 
         private final long electionTimeOutIntervalMillis;