import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
/**
* TestActorFactory provides methods to create both normal and test actors and to kill them when the factory is closed
*/
public ActorRef createActor(Props props){
ActorRef actorRef = system.actorOf(props);
- createdActors.add(actorRef);
- return actorRef;
+ return addActor(actorRef);
}
/**
*/
public ActorRef createActor(Props props, String actorId){
ActorRef actorRef = system.actorOf(props, actorId);
- createdActors.add(actorRef);
- return actorRef;
+ return addActor(actorRef);
}
/**
* @param <T>
* @return
*/
+ @SuppressWarnings("unchecked")
public <T extends Actor> TestActorRef<T> createTestActor(Props props, String actorId){
TestActorRef<T> actorRef = TestActorRef.create(system, props, actorId);
+ return (TestActorRef<T>) addActor(actorRef);
+ }
+
+ private <T extends ActorRef> ActorRef addActor(T actorRef) {
createdActors.add(actorRef);
+ verifyActorReady(actorRef);
return actorRef;
}
+ private void verifyActorReady(ActorRef actorRef) {
+ // Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite
+ // in a state yet to receive messages or isn't actually created yet. This seems to happen with
+ // actorSelection so, to alleviate it, we use an actorSelection and call resolveOne with retries to
+ // ensure it's ready.
+
+ int tries = 1;
+ while(true) {
+ try {
+ Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS);
+ Future<ActorRef> future = system.actorSelection(actorRef.path()).resolveOne(timeout);
+ Await.ready(future, timeout.duration());
+ break;
+ } catch (Exception e) {
+ if(tries++ > 20) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
/**
* Create a test actor with an auto-generated name
* @param props
* @param <T>
* @return
*/
+ @SuppressWarnings("unchecked")
public <T extends Actor> TestActorRef<T> createTestActor(Props props){
TestActorRef<T> actorRef = TestActorRef.create(system, props);
- createdActors.add(actorRef);
- return actorRef;
+ return (TestActorRef<T>) addActor(actorRef);
}
/**
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
}};
}
+ @Test
+ public void testReplicationConsensusWithNonVotingFollower() {
+ logStart("testReplicationConsensusWithNonVotingFollower");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ String nonVotingFollowerId = "nonvoting-follower";
+ TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
+ Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+
+ leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+
+ leader = new Leader(leaderActorContext);
+
+ // Ignore initial heartbeats
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send a Replicate message and wait for AppendEntries.
+ sendReplicate(leaderActorContext, 0);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ // Send reply only from the voting follower and verify consensus via ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send another Replicate message
+ sendReplicate(leaderActorContext, 1);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
+ AppendEntries.class);
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+
+ // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
+
+ MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
+
+ // Send reply from the voting follower and verify consensus.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ }
+
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {