Merge "Bug 2731: Discard changes only when transaction exist."
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 34e1ed9b3b3478032c86e9058998ac23610c7c97..c57fce1cd553d8c41dd786d9c4bb6f33e97791b6 100644 (file)
@@ -41,11 +41,12 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
 
-public class LeaderTest extends AbstractRaftActorBehaviorTest {
+public class LeaderTest extends AbstractLeaderTest {
 
     static final String FOLLOWER_ID = "follower";
 
@@ -742,7 +743,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
     private MockRaftActorContext createActorContextWithFollower() {
         MockRaftActorContext actorContext = createActorContext();
-        actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
+        actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
                 followerActor.path().toString()).build());
         return actorContext;
     }
@@ -756,22 +757,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return context;
     }
 
-    public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
-        AbstractRaftActorBehavior behavior;
-
-        @Override public void onReceive(Object message) throws Exception {
-            if(behavior != null) {
-                behavior.handleMessage(sender(), message);
-            }
-
-            super.onReceive(message);
-        }
-
-        public static Props props() {
-            return Props.create(ForwardMessageToBehaviorActor.class);
-        }
-    }
-
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
@@ -781,7 +766,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
         Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().behavior = follower;
+        followerActor.underlyingActor().setBehavior(follower);
 
         Map<String, String> peerAddresses = new HashMap<>();
         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
@@ -834,7 +819,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
         Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().behavior = follower;
+        followerActor.underlyingActor().setBehavior(follower);
 
         Map<String, String> peerAddresses = new HashMap<>();
         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
@@ -871,7 +856,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         assertEquals(2, appendEntriesReply.getLogLastIndex());
         assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-        leaderActor.underlyingActor().behavior = leader;
+        leaderActor.underlyingActor().setBehavior(follower);
         leader.handleMessage(followerActor, appendEntriesReply);
 
         leaderActor.underlyingActor().clear();
@@ -1076,7 +1061,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         followerActorContext.setConfigParams(configParams);
 
         Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().behavior = follower;
+        followerActor.underlyingActor().setBehavior(follower);
 
         leaderActorContext.getReplicatedLog().removeFrom(0);
         leaderActorContext.setCommitIndex(-1);
@@ -1134,6 +1119,66 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         follower.close();
     }
 
+    @Test
+    public void testLaggingFollowerStarvation() throws Exception {
+        logStart("testLaggingFollowerStarvation");
+        new JavaTestKit(getSystem()) {{
+            String leaderActorId = actorFactory.generateActorId("leader");
+            String follower1ActorId = actorFactory.generateActorId("follower");
+            String follower2ActorId = actorFactory.generateActorId("follower");
+
+            TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+                    actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+            ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+            ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+            MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            leaderActorContext.setReplicatedLog(
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(follower1ActorId,
+                    follower1Actor.path().toString());
+            peerAddresses.put(follower2ActorId,
+                    follower2Actor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+            leaderActorContext.getTermInformation().update(1, leaderActorId);
+
+            RaftActorBehavior leader = createBehavior(leaderActorContext);
+
+            leaderActor.underlyingActor().setBehavior(leader);
+
+            for(int i=1;i<6;i++) {
+                // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+                RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+                assertTrue(newBehavior == leader);
+                Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+            }
+
+            // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+            List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+            assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+                    heartbeats.size() > 1);
+
+            // Check if follower-2 got AppendEntries during this time and was not starved
+            List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+
+            assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+                    appendEntries.size() > 1);
+
+        }};
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {