Add leader unit test for non-voting consensus 27/29027/4
authorTom Pantelis <tpanteli@brocade.com>
Fri, 30 Oct 2015 01:20:24 +0000 (21:20 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 2 Nov 2015 13:32:54 +0000 (13:32 +0000)
Added a test case to LeaderTest to verify a non-voting follower
does not influence replication consensus.

Also I saw intermitent test failures (in jenkins as well during first
verify build) due to a message going to dead letters shortly after
actor creation (also reported in Bug 4223). Specifically it was occurring
when the leader sent the initial AppendEntries heartbeat to a follower. This
seems like a timing issue/bug in akka when using an ActorSelection. I
added code in the TestActorFactory to use an actorSelection and call
resolveOne in a retry loop. This seems to alleviate the issue as I ran
LeaderTest over 1000 times successfully.

Change-Id: I65cb87f419c280befe2d82300a981bd8e6f88742
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 82a5b498d244e106b76924689b9ade3a72ba3835..79df92b7d4d64280f9795dfa64e0e0cd507d8fc1 100644 (file)
@@ -23,10 +23,14 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 
 /**
  * TestActorFactory provides methods to create both normal and test actors and to kill them when the factory is closed
@@ -58,8 +62,7 @@ public class TestActorFactory implements AutoCloseable {
      */
     public ActorRef createActor(Props props){
         ActorRef actorRef = system.actorOf(props);
-        createdActors.add(actorRef);
-        return actorRef;
+        return addActor(actorRef);
     }
 
     /**
@@ -70,8 +73,7 @@ public class TestActorFactory implements AutoCloseable {
      */
     public ActorRef createActor(Props props, String actorId){
         ActorRef actorRef = system.actorOf(props, actorId);
-        createdActors.add(actorRef);
-        return actorRef;
+        return addActor(actorRef);
     }
 
     /**
@@ -81,22 +83,49 @@ public class TestActorFactory implements AutoCloseable {
      * @param <T>
      * @return
      */
+    @SuppressWarnings("unchecked")
     public <T extends Actor> TestActorRef<T> createTestActor(Props props, String actorId){
         TestActorRef<T> actorRef = TestActorRef.create(system, props, actorId);
+        return (TestActorRef<T>) addActor(actorRef);
+    }
+
+    private <T extends ActorRef> ActorRef addActor(T actorRef) {
         createdActors.add(actorRef);
+        verifyActorReady(actorRef);
         return actorRef;
     }
 
+    private void verifyActorReady(ActorRef actorRef) {
+        // Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite
+        // in a state yet to receive messages or isn't actually created yet. This seems to happen with
+        // actorSelection so, to alleviate it, we use an actorSelection and call resolveOne with retries to
+        // ensure it's ready.
+
+        int tries = 1;
+        while(true) {
+            try {
+                Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS);
+                Future<ActorRef> future = system.actorSelection(actorRef.path()).resolveOne(timeout);
+                Await.ready(future, timeout.duration());
+                break;
+            } catch (Exception e) {
+                if(tries++ > 20) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
     /**
      * Create a test actor with an auto-generated name
      * @param props
      * @param <T>
      * @return
      */
+    @SuppressWarnings("unchecked")
     public <T extends Actor> TestActorRef<T> createTestActor(Props props){
         TestActorRef<T> actorRef = TestActorRef.create(system, props);
-        createdActors.add(actorRef);
-        return actorRef;
+        return (TestActorRef<T>) addActor(actorRef);
     }
 
     /**
index 507c30095240f22c9d282d92c69a9f5f9bf8871c..6664600073f2c727936ece4da93f6cd5b76bb290 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -1879,6 +1880,69 @@ public class LeaderTest extends AbstractLeaderTest {
         }};
     }
 
+    @Test
+    public void testReplicationConsensusWithNonVotingFollower() {
+        logStart("testReplicationConsensusWithNonVotingFollower");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        String nonVotingFollowerId = "nonvoting-follower";
+        TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
+                Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+
+        leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+
+        leader = new Leader(leaderActorContext);
+
+        // Ignore initial heartbeats
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Send a Replicate message and wait for AppendEntries.
+        sendReplicate(leaderActorContext, 0);
+
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+        // Send reply only from the voting follower and verify consensus via ApplyState.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Send another Replicate message
+        sendReplicate(leaderActorContext, 1);
+
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
+                AppendEntries.class);
+        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+
+        // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
+
+        MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
+
+        // Send reply from the voting follower and verify consensus.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {