--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Abstract base for a leader election scenario test.
+ *
+ * @author Thomas Pantelis
+ */
+public class AbstractLeaderElectionScenarioTest {
+ static final int HEARTBEAT_INTERVAL = 50;
+
+ static class MemberActor extends MessageCollectorActor {
+
+ volatile RaftActorBehavior behavior;
+ Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
+ Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
+ CountDownLatch behaviorStateChangeLatch;
+
+ public static Props props() {
+ return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId());
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ // Ignore scheduled SendHeartBeat messages.
+ if(message instanceof SendHeartBeat) {
+ return;
+ }
+
+ try {
+ if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
+ RaftActorBehavior oldBehavior = behavior;
+ behavior = behavior.handleMessage(getSender(), message);
+ if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
+ behaviorStateChangeLatch.countDown();
+ }
+ }
+ } finally {
+ super.onReceive(message);
+
+ CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
+ if(latch != null) {
+ latch.countDown();
+ }
+ }
+ }
+
+ void expectBehaviorStateChange() {
+ behaviorStateChangeLatch = new CountDownLatch(1);
+ }
+
+ void waitForBehaviorStateChange() {
+ assertTrue("Expected behavior state change",
+ Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
+ }
+
+ void expectMessageClass(Class<?> expClass, int expCount) {
+ messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
+ }
+
+ void waitForExpectedMessages(Class<?> expClass) {
+ CountDownLatch latch = messagesReceivedLatches.get(expClass);
+ assertNotNull("No messages received for " + expClass, latch);
+ assertTrue("Missing messages of type " + expClass,
+ Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
+ }
+
+ void dropMessagesToBehavior(Class<?> msgClass) {
+ dropMessagesToBehavior(msgClass, 1);
+ }
+
+ void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
+ expectMessageClass(msgClass, expCount);
+ dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
+ }
+
+ void clearDropMessagesToBehavior() {
+ dropMessagesToBehavior.clear();
+ }
+
+ @Override
+ public void clear() {
+ behaviorStateChangeLatch = null;
+ clearDropMessagesToBehavior();
+ messagesReceivedLatches.clear();
+ super.clear();
+ }
+
+ void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
+ Object message = getFirstMatching(getSelf(), msgClass);
+ assertNotNull("Message of type " + msgClass + " not received", message);
+ getSelf().tell(message, sender);
+ }
+
+ void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
+ for(Object m: getAllMatching(getSelf(), msgClass)) {
+ getSelf().tell(m, sender);
+ }
+ }
+
+ <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
+ Object message = getFirstMatching(getSelf(), msgClass);
+ assertNotNull("Message of type " + msgClass + " not received", message);
+ return (T) message;
+ }
+ }
+
+ protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
+ protected final ActorSystem system = ActorSystem.create("test");
+ protected TestActorRef<MemberActor> member1ActorRef;
+ protected TestActorRef<MemberActor> member2ActorRef;
+ protected TestActorRef<MemberActor> member3ActorRef;
+ protected MemberActor member1Actor;
+ protected MemberActor member2Actor;
+ protected MemberActor member3Actor;
+ protected MockRaftActorContext member1Context;
+ protected MockRaftActorContext member2Context;
+ protected MockRaftActorContext member3Context;
+
+ @Before
+ public void setup() throws Exception {
+ member1ActorRef = newMemberActor("member1");
+ member2ActorRef = newMemberActor("member2");
+ member3ActorRef = newMemberActor("member3");
+
+ member1Actor = member1ActorRef.underlyingActor();
+ member2Actor = member2ActorRef.underlyingActor();
+ member3Actor = member3ActorRef.underlyingActor();
+ }
+
+ @After
+ public void tearDown() {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ DefaultConfigParamsImpl newConfigParams() {
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ return configParams;
+ }
+
+ MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
+ Map<String, String> peerAddresses) {
+ MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
+ context.setPeerAddresses(peerAddresses);
+ context.getTermInformation().updateAndPersist(1, "");
+ return context;
+ }
+
+ void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
+ assertEquals(name + " behavior state", expState, actor.behavior.state());
+ }
+
+ void initializeLeaderBehavior(MemberActor actor, RaftActorContext context,
+ int numActiveFollowers) throws Exception {
+ // Leader sends immediate heartbeats - we don't care about it so ignore it.
+
+ actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
+ Leader leader = new Leader(context);
+ actor.waitForExpectedMessages(AppendEntriesReply.class);
+ actor.behavior = leader;
+
+ actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
+ actor.clear();
+ }
+
+ TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
+ TestActorRef<MemberActor> actor = TestActorRef.create(system, MemberActor.props(), name);
+ MessageCollectorActor.waitUntilReady(actor);
+ return actor;
+ }
+
+ void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
+ Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
+ leaderActor.underlyingActor().behavior.handleMessage(leaderActor, new SendHeartBeat());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+
+/**
+ * A leader election scenario test that delays various messages to behaviors to simulate network delays.
+ *
+ * @author Thomas Pantelis
+ */
+public class DelayedMessagesElectionScenarioTest extends AbstractLeaderElectionScenarioTest {
+
+ @Test
+ public void runTest() throws Exception {
+ testLog.info("DelayedMessagesElectionScenarioTest starting");
+
+ setupInitialMemberBehaviors();
+
+ sendInitialElectionTimeoutToFollowerMember2();
+
+ forwardDelayedRequestVotesToLeaderMember1AndFollowerMember3();
+
+ sendElectionTimeoutToFollowerMember3();
+
+ forwardDelayedRequestVoteReplyFromOriginalFollowerMember3ToMember2();
+
+ testLog.info("DelayedMessagesElectionScenarioTest ending");
+ }
+
+ private void forwardDelayedRequestVoteReplyFromOriginalFollowerMember3ToMember2() throws Exception {
+ testLog.info("forwardDelayedRequestVoteReplyFromOriginalFollowerMember3ToMember2 starting");
+
+ // Now forward the original delayed RequestVoteReply from member 3 to member 2 that granted
+ // the vote. Since member 2 is now a Follower, the RequestVoteReply should be ignored.
+
+ member2Actor.clearDropMessagesToBehavior();
+ member2Actor.forwardCapturedMessageToBehavior(RequestVoteReply.class, member3ActorRef);
+
+ member2Actor.waitForExpectedMessages(RequestVoteReply.class);
+
+ verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
+ verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
+ verifyBehaviorState("member 3", member3Actor, RaftState.Leader);
+
+ assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", 3, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("forwardDelayedRequestVoteReplyFromOriginalFollowerMember3ToMember2 ending");
+ }
+
+ private void sendElectionTimeoutToFollowerMember3() throws Exception {
+ testLog.info("sendElectionTimeoutToFollowerMember3 starting");
+
+ // Send ElectionTimeout to member 3 to simulate missing heartbeat from a Leader. member 3
+ // should switch to Candidate and send out RequestVote messages. member 1 should grant the
+ // vote and send a reply. After receiving the RequestVoteReply, member 3 should switch to leader.
+
+ member2Actor.expectBehaviorStateChange();
+ member3Actor.clear();
+ member3Actor.expectMessageClass(RequestVoteReply.class, 1);
+ member3Actor.expectMessageClass(AppendEntriesReply.class, 2);
+
+ member3ActorRef.tell(new ElectionTimeout(), ActorRef.noSender());
+
+ member3Actor.waitForExpectedMessages(RequestVoteReply.class);
+
+ RequestVoteReply requestVoteReply = member3Actor.getCapturedMessage(RequestVoteReply.class);
+ assertEquals("getTerm", member3Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
+ assertEquals("isVoteGranted", true, requestVoteReply.isVoteGranted());
+
+ verifyBehaviorState("member 3", member3Actor, RaftState.Leader);
+
+ // member 2 should've switched to Follower as member 3's RequestVote term (3) was greater
+ // than member 2's term (2).
+
+ member2Actor.waitForBehaviorStateChange();
+ verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
+
+ // The switch to leader should cause an immediate AppendEntries heartbeat from member 3.
+
+ member3Actor.waitForExpectedMessages(AppendEntriesReply.class);
+
+ assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", 3, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("sendElectionTimeoutToFollowerMember3 ending");
+ }
+
+ private void forwardDelayedRequestVotesToLeaderMember1AndFollowerMember3() throws Exception {
+ testLog.info("forwardDelayedRequestVotesToLeaderMember1AndFollowerMember3 starting");
+
+ // At this point member 1 and 3 actors have captured the RequestVote messages. First
+ // forward the RequestVote message to member 1's behavior. Since the RequestVote term
+ // is greater than member 1's term and member 1 is a Leader, member 1 should switch to Follower
+ // without replying to RequestVote and update its term to 2.
+
+ member1Actor.clearDropMessagesToBehavior();
+ member1Actor.expectBehaviorStateChange();
+ member1Actor.forwardCapturedMessageToBehavior(RequestVote.class, member2ActorRef);
+ member1Actor.waitForExpectedMessages(RequestVote.class);
+
+ member1Actor.waitForBehaviorStateChange();
+ verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
+
+ // Now forward member 3's captured RequestVote message to its behavior. Since member 3 is
+ // already a Follower, it should update its term to 2 and send a RequestVoteReply back to
+ // member 2 granting the vote b/c the RequestVote's term, lastLogTerm, and lastLogIndex
+ // should satisfy the criteria for granting the vote. However, we'll delay sending the
+ // RequestVoteReply to member 2's behavior to simulate network latency.
+
+ member2Actor.dropMessagesToBehavior(RequestVoteReply.class);
+
+ member3Actor.clearDropMessagesToBehavior();
+ member3Actor.expectMessageClass(RequestVote.class, 1);
+ member3Actor.forwardCapturedMessageToBehavior(RequestVote.class, member2ActorRef);
+ member3Actor.waitForExpectedMessages(RequestVote.class);
+ verifyBehaviorState("member 3", member3Actor, RaftState.Follower);
+
+ assertEquals("member 1 election term", 2, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 2, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", 2, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("forwardDelayedRequestVotesToLeaderMember1AndFollowerMember3 ending");
+ }
+
+ private void sendInitialElectionTimeoutToFollowerMember2() {
+ testLog.info("sendInitialElectionTimeoutToFollowerMember2 starting");
+
+ // Send ElectionTimeout to member 2 to simulate missing heartbeat from the Leader. member 2
+ // should switch to Candidate and send out RequestVote messages. Set member 1 and 3 actors
+ // to capture RequestVote but not to forward to the behavior just yet as we want to
+ // control the order of RequestVote messages to member 1 and 3.
+
+ member1Actor.dropMessagesToBehavior(RequestVote.class);
+
+ member2Actor.expectBehaviorStateChange();
+
+ member3Actor.dropMessagesToBehavior(RequestVote.class);
+
+ member2ActorRef.tell(new ElectionTimeout(), ActorRef.noSender());
+
+ member1Actor.waitForExpectedMessages(RequestVote.class);
+ member3Actor.waitForExpectedMessages(RequestVote.class);
+
+ member2Actor.waitForBehaviorStateChange();
+ verifyBehaviorState("member 2", member2Actor, RaftState.Candidate);
+
+ assertEquals("member 1 election term", 1, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 2, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", 1, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("sendInitialElectionTimeoutToFollowerMember2 ending");
+ }
+
+ private void setupInitialMemberBehaviors() throws Exception {
+ testLog.info("setupInitialMemberBehaviors starting");
+
+ // Create member 2's behavior initially as Follower
+
+ member2Context = newRaftActorContext("member2", member2ActorRef,
+ ImmutableMap.<String,String>builder().
+ put("member1", member1ActorRef.path().toString()).
+ put("member3", member3ActorRef.path().toString()).build());
+
+ DefaultConfigParamsImpl member2ConfigParams = newConfigParams();
+ member2Context.setConfigParams(member2ConfigParams);
+
+ Follower member2Behavior = new Follower(member2Context);
+ member2Actor.behavior = member2Behavior;
+
+ // Create member 3's behavior initially as Follower
+
+ member3Context = newRaftActorContext("member3", member3ActorRef,
+ ImmutableMap.<String,String>builder().
+ put("member1", member1ActorRef.path().toString()).
+ put("member2", member2ActorRef.path().toString()).build());
+
+ DefaultConfigParamsImpl member3ConfigParams = newConfigParams();
+ member3Context.setConfigParams(member3ConfigParams);
+
+ Follower member3Behavior = new Follower(member3Context);
+ member3Actor.behavior = member3Behavior;
+
+ // Create member 1's behavior initially as Leader
+
+ member1Context = newRaftActorContext("member1", member1ActorRef,
+ ImmutableMap.<String,String>builder().
+ put("member2", member2ActorRef.path().toString()).
+ put("member3", member3ActorRef.path().toString()).build());
+
+ DefaultConfigParamsImpl member1ConfigParams = newConfigParams();
+ member1Context.setConfigParams(member1ConfigParams);
+
+ initializeLeaderBehavior(member1Actor, member1Context, 2);
+
+ member2Actor.clear();
+ member3Actor.clear();
+
+ testLog.info("setupInitialMemberBehaviors ending");
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.raft.behaviors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
-import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
-import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.MockRaftActorContext.SimpleReplicatedLog;
-import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
-import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVote;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.impl.SimpleLogger;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Tests various leader election scenarios.
- *
- * @author Thomas Pantelis
- */
-public class LeaderElectionScenariosTest {
-
- private static final int HEARTBEAT_INTERVAL = 50;
-
- public static class MemberActor extends MessageCollectorActor {
-
- volatile RaftActorBehavior behavior;
- Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
- Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
- CountDownLatch behaviorStateChangeLatch;
-
- public static Props props() {
- return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId());
- }
-
- @Override
- public void onReceive(Object message) throws Exception {
- // Ignore scheduled SendHeartBeat messages.
- if(message instanceof SendHeartBeat) {
- return;
- }
-
- try {
- if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
- RaftActorBehavior oldBehavior = behavior;
- behavior = behavior.handleMessage(getSender(), message);
- if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
- behaviorStateChangeLatch.countDown();
- }
- }
- } finally {
- super.onReceive(message);
-
- CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
- if(latch != null) {
- latch.countDown();
- }
- }
- }
-
- void expectBehaviorStateChange() {
- behaviorStateChangeLatch = new CountDownLatch(1);
- }
-
- void waitForBehaviorStateChange() {
- assertTrue("Expected behavior state change",
- Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
- }
-
- void expectMessageClass(Class<?> expClass, int expCount) {
- messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
- }
-
- void waitForExpectedMessages(Class<?> expClass) {
- CountDownLatch latch = messagesReceivedLatches.get(expClass);
- assertNotNull("No messages received for " + expClass, latch);
- assertTrue("Missing messages of type " + expClass,
- Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
- }
-
- void dropMessagesToBehavior(Class<?> msgClass) {
- dropMessagesToBehavior(msgClass, 1);
- }
-
- void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
- expectMessageClass(msgClass, expCount);
- dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
- }
-
- void clearDropMessagesToBehavior() {
- dropMessagesToBehavior.clear();
- }
-
- @Override
- public void clear() {
- behaviorStateChangeLatch = null;
- clearDropMessagesToBehavior();
- messagesReceivedLatches.clear();
- super.clear();
- }
-
- void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
- Object message = getFirstMatching(getSelf(), msgClass);
- assertNotNull("Message of type " + msgClass + " not received", message);
- getSelf().tell(message, sender);
- }
-
- void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
- for(Object m: getAllMatching(getSelf(), msgClass)) {
- getSelf().tell(m, sender);
- }
- }
-
- <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
- Object message = getFirstMatching(getSelf(), msgClass);
- assertNotNull("Message of type " + msgClass + " not received", message);
- return (T) message;
- }
- }
-
- static {
- System.setProperty(SimpleLogger.LOG_KEY_PREFIX + MockRaftActorContext.class.getName(), "trace");
- }
-
- private final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
- private final ActorSystem system = ActorSystem.create("test");
-
- @After
- public void tearDown() {
- JavaTestKit.shutdownActorSystem(system);
- }
-
- private DefaultConfigParamsImpl newConfigParams() {
- DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
- configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
- configParams.setElectionTimeoutFactor(100000);
- configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- return configParams;
- }
-
- private MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
- Map<String, String> peerAddresses) {
- MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
- context.setPeerAddresses(peerAddresses);
- context.getTermInformation().updateAndPersist(1, "");
- return context;
- }
-
- private void verifyBehaviorState(String name, TestActorRef<MemberActor> actor, RaftState expState) {
- assertEquals(name + " behavior state", expState, actor.underlyingActor().behavior.state());
- }
-
- private void initializeLeaderBehavior(TestActorRef<MemberActor> actor, RaftActorContext context,
- int numActiveFollowers) throws Exception {
- // Leader sends immediate heartbeats - we don't care about it so ignore it.
-
- actor.underlyingActor().expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
- Leader leader = new Leader(context);
- actor.underlyingActor().waitForExpectedMessages(AppendEntriesReply.class);
- actor.underlyingActor().behavior = leader;
-
- actor.underlyingActor().forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
- actor.underlyingActor().clear();
- }
-
- private TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
- TestActorRef<MemberActor> actor = TestActorRef.create(system, MemberActor.props(), name);
- MessageCollectorActor.waitUntilReady(actor);
- return actor;
- }
-
- private void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
- Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
- leaderActor.underlyingActor().behavior.handleMessage(leaderActor, new SendHeartBeat());
- }
-
- @Test
- public void testDelayedMessagesScenario() throws Exception {
- testLog.info("Starting testDelayedMessagesScenario");
-
- TestActorRef<MemberActor> member1Actor = newMemberActor("member1");
- TestActorRef<MemberActor> member2Actor = newMemberActor("member2");
- TestActorRef<MemberActor> member3Actor = newMemberActor("member3");
-
- // Create member 2's behavior initially as Follower
-
- MockRaftActorContext member2Context = newRaftActorContext("member2", member2Actor,
- ImmutableMap.<String,String>builder().
- put("member1", member1Actor.path().toString()).
- put("member3", member3Actor.path().toString()).build());
-
- DefaultConfigParamsImpl member2ConfigParams = newConfigParams();
- member2Context.setConfigParams(member2ConfigParams);
-
- Follower member2Behavior = new Follower(member2Context);
- member2Actor.underlyingActor().behavior = member2Behavior;
-
- // Create member 3's behavior initially as Follower
-
- MockRaftActorContext member3Context = newRaftActorContext("member3", member3Actor,
- ImmutableMap.<String,String>builder().
- put("member1", member1Actor.path().toString()).
- put("member2", member2Actor.path().toString()).build());
-
- DefaultConfigParamsImpl member3ConfigParams = newConfigParams();
- member3Context.setConfigParams(member3ConfigParams);
-
- Follower member3Behavior = new Follower(member3Context);
- member3Actor.underlyingActor().behavior = member3Behavior;
-
- // Create member 1's behavior initially as Leader
-
- MockRaftActorContext member1Context = newRaftActorContext("member1", member1Actor,
- ImmutableMap.<String,String>builder().
- put("member2", member2Actor.path().toString()).
- put("member3", member3Actor.path().toString()).build());
-
- DefaultConfigParamsImpl member1ConfigParams = newConfigParams();
- member1Context.setConfigParams(member1ConfigParams);
-
- initializeLeaderBehavior(member1Actor, member1Context, 2);
-
- member2Actor.underlyingActor().clear();
- member3Actor.underlyingActor().clear();
-
- // Send ElectionTimeout to member 2 to simulate missing heartbeat from the Leader. member 2
- // should switch to Candidate and send out RequestVote messages. Set member 1 and 3 actors
- // to capture RequestVote but not to forward to the behavior just yet as we want to
- // control the order of RequestVote messages to member 1 and 3.
-
- member1Actor.underlyingActor().dropMessagesToBehavior(RequestVote.class);
-
- member2Actor.underlyingActor().expectBehaviorStateChange();
-
- member3Actor.underlyingActor().dropMessagesToBehavior(RequestVote.class);
-
- member2Actor.tell(new ElectionTimeout(), ActorRef.noSender());
-
- member1Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
- member3Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
-
- member2Actor.underlyingActor().waitForBehaviorStateChange();
- verifyBehaviorState("member 2", member2Actor, RaftState.Candidate);
-
- assertEquals("member 1 election term", 1, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 2, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", 1, member3Context.getTermInformation().getCurrentTerm());
-
- // At this point member 1 and 3 actors have captured the RequestVote messages. First
- // forward the RequestVote message to member 1's behavior. Since the RequestVote term
- // is greater than member 1's term, member 1 should switch to Follower without replying
- // to RequestVote and update its term to 2.
-
- member1Actor.underlyingActor().clearDropMessagesToBehavior();
- member1Actor.underlyingActor().expectBehaviorStateChange();
- member1Actor.underlyingActor().forwardCapturedMessageToBehavior(RequestVote.class, member2Actor);
- member1Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
-
- member1Actor.underlyingActor().waitForBehaviorStateChange();
- verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
-
- // Now forward member 3's captured RequestVote message to its behavior. Since member 3 is
- // already a Follower, it should update its term to 2 and send a RequestVoteReply back to
- // member 2 granting the vote b/c the RequestVote's term, lastLogTerm, and lastLogIndex
- // should satisfy the criteria for granting the vote. However, we'll delay sending the
- // RequestVoteReply to member 2's behavior to simulate network latency.
-
- member2Actor.underlyingActor().dropMessagesToBehavior(RequestVoteReply.class);
-
- member3Actor.underlyingActor().clearDropMessagesToBehavior();
- member3Actor.underlyingActor().expectMessageClass(RequestVote.class, 1);
- member3Actor.underlyingActor().forwardCapturedMessageToBehavior(RequestVote.class, member2Actor);
- member3Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
- verifyBehaviorState("member 3", member3Actor, RaftState.Follower);
-
- assertEquals("member 1 election term", 2, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 2, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", 2, member3Context.getTermInformation().getCurrentTerm());
-
- // Send ElectionTimeout to member 3 to simulate missing heartbeat from a Leader. member 3
- // should switch to Candidate and send out RequestVote messages. member 1 should grant the
- // vote and send a reply. After receiving the RequestVoteReply, member 3 should switch to leader.
-
- member2Actor.underlyingActor().expectBehaviorStateChange();
- member3Actor.underlyingActor().clear();
- member3Actor.underlyingActor().expectMessageClass(RequestVoteReply.class, 1);
- member3Actor.underlyingActor().expectMessageClass(AppendEntriesReply.class, 2);
-
- member3Actor.tell(new ElectionTimeout(), ActorRef.noSender());
-
- member3Actor.underlyingActor().waitForExpectedMessages(RequestVoteReply.class);
-
- RequestVoteReply requestVoteReply = member3Actor.underlyingActor().getCapturedMessage(RequestVoteReply.class);
- assertEquals("getTerm", member3Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
- assertEquals("isVoteGranted", true, requestVoteReply.isVoteGranted());
-
- verifyBehaviorState("member 3", member3Actor, RaftState.Leader);
-
- // member 2 should've switched to Follower as member 3's RequestVote term (3) was greater
- // than member 2's term (2).
-
- member2Actor.underlyingActor().waitForBehaviorStateChange();
- verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
-
- // The switch to leader should cause an immediate AppendEntries heartbeat from member 3.
-
- member3Actor.underlyingActor().waitForExpectedMessages(AppendEntriesReply.class);
-
- assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", 3, member3Context.getTermInformation().getCurrentTerm());
-
- // Now forward the original delayed RequestVoteReply from member 3 to member 2 that granted
- // the vote. Since member 2 is now a Follower, the RequestVoteReply should be ignored.
-
- member2Actor.underlyingActor().clearDropMessagesToBehavior();
- member2Actor.underlyingActor().forwardCapturedMessageToBehavior(RequestVoteReply.class, member3Actor);
-
- member2Actor.underlyingActor().waitForExpectedMessages(RequestVoteReply.class);
-
- verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
- verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
- verifyBehaviorState("member 3", member3Actor, RaftState.Leader);
-
- assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", 3, member3Context.getTermInformation().getCurrentTerm());
-
- testLog.info("testDelayedMessagesScenario done");
- }
-
- @Test
- public void testPartitionedLeadersScenario() throws Exception {
- testLog.info("Starting testPartitionedLeadersScenario");
-
- TestActorRef<MemberActor> member1Actor = newMemberActor("member1");
- TestActorRef<MemberActor> member2Actor = newMemberActor("member2");
- TestActorRef<MemberActor> member3Actor = newMemberActor("member3");
-
- // Create member 2's behavior initially as Follower
-
- MockRaftActorContext member2Context = newRaftActorContext("member2", member2Actor,
- ImmutableMap.<String,String>builder().
- put("member1", member1Actor.path().toString()).
- put("member3", member3Actor.path().toString()).build());
-
- DefaultConfigParamsImpl member2ConfigParams = newConfigParams();
- member2Context.setConfigParams(member2ConfigParams);
-
- Follower member2Behavior = new Follower(member2Context);
- member2Actor.underlyingActor().behavior = member2Behavior;
-
- // Create member 3's behavior initially as Follower
-
- MockRaftActorContext member3Context = newRaftActorContext("member3", member3Actor,
- ImmutableMap.<String,String>builder().
- put("member1", member1Actor.path().toString()).
- put("member2", member2Actor.path().toString()).build());
-
- DefaultConfigParamsImpl member3ConfigParams = newConfigParams();
- member3Context.setConfigParams(member3ConfigParams);
-
- Follower member3Behavior = new Follower(member3Context);
- member3Actor.underlyingActor().behavior = member3Behavior;
-
- // Create member 1's behavior initially as Leader
-
- MockRaftActorContext member1Context = newRaftActorContext("member1", member1Actor,
- ImmutableMap.<String,String>builder().
- put("member2", member2Actor.path().toString()).
- put("member3", member3Actor.path().toString()).build());
-
- DefaultConfigParamsImpl member1ConfigParams = newConfigParams();
- member1Context.setConfigParams(member1ConfigParams);
-
- initializeLeaderBehavior(member1Actor, member1Context, 2);
-
- member2Actor.underlyingActor().clear();
- member3Actor.underlyingActor().clear();
-
- // Send ElectionTimeout to member 2 to simulate no heartbeat from the Leader (member 1).
- // member 2 should switch to Candidate, start new term 2 and send out RequestVote messages.
- // member 1 will switch to Follower b/c its term is less than the RequestVote term, also it
- // won't send back a reply. member 3 will drop the message (ie won't forward it to its behavior) to
- // simulate loss of network connectivity between member 2 and 3.
-
- member1Actor.underlyingActor().expectMessageClass(RequestVote.class, 1);
-
- member2Actor.underlyingActor().expectBehaviorStateChange();
-
- member3Actor.underlyingActor().dropMessagesToBehavior(RequestVote.class);
-
- member2Actor.tell(new ElectionTimeout(), ActorRef.noSender());
-
- member1Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
- member3Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
-
- // member 1 should switch to Follower as the RequestVote term is greater than its term. It
- // won't send back a RequestVoteReply in this case.
-
- verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
-
- // member 2 should switch to Candidate since member 1 didn't reply.
-
- member2Actor.underlyingActor().waitForBehaviorStateChange();
- verifyBehaviorState("member 2", member2Actor, RaftState.Candidate);
-
- assertEquals("member 1 election term", 2, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 2, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", 1, member3Context.getTermInformation().getCurrentTerm());
-
- // Send ElectionTimeout to member 3 to simulate no heartbeat from the Leader (member 1).
- // member 2 should switch to Candidate and send out RequestVote messages. member 1 will reply and
- // grant the vote but member 2 will drop the message to simulate loss of network connectivity.
-
- member1Actor.underlyingActor().clear();
- member1Actor.underlyingActor().expectMessageClass(RequestVote.class, 1);
- member1Actor.underlyingActor().expectMessageClass(AppendEntries.class, 1);
-
- member2Actor.underlyingActor().clear();
- member2Actor.underlyingActor().dropMessagesToBehavior(RequestVote.class);
- member2Actor.underlyingActor().dropMessagesToBehavior(AppendEntries.class);
-
- member3Actor.underlyingActor().clear();
- member3Actor.underlyingActor().expectMessageClass(RequestVoteReply.class, 1);
- member3Actor.underlyingActor().expectMessageClass(AppendEntriesReply.class, 1);
-
- member3Actor.tell(new ElectionTimeout(), ActorRef.noSender());
-
- member1Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
- member2Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
- member3Actor.underlyingActor().waitForExpectedMessages(RequestVoteReply.class);
-
- RequestVoteReply requestVoteReply = member3Actor.underlyingActor().getCapturedMessage(RequestVoteReply.class);
- assertEquals("getTerm", member3Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
- assertEquals("isVoteGranted", true, requestVoteReply.isVoteGranted());
-
- // when member 3 switches to Leader it will immediately send out heartbeat AppendEntries to
- // the followers. Wait for AppendEntries to member 1 and its AppendEntriesReply. The
- // AppendEntries message to member 2 is dropped.
-
- member1Actor.underlyingActor().waitForExpectedMessages(AppendEntries.class);
- member2Actor.underlyingActor().waitForExpectedMessages(AppendEntries.class);
- member3Actor.underlyingActor().waitForExpectedMessages(AppendEntriesReply.class);
-
- verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
- verifyBehaviorState("member 2", member2Actor, RaftState.Candidate);
- verifyBehaviorState("member 3", member3Actor, RaftState.Leader);
-
- assertEquals("member 1 election term", 2, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 2, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", 2, member3Context.getTermInformation().getCurrentTerm());
-
- // member 2 is partitioned from the Leader (member 3) and hasn't received any messages. It
- // would get another ElectionTimeout so simulate that. member 1 should send back a reply
- // granting the vote. Messages (RequestVote and AppendEntries) from member 2 to member 3
- // are dropped to simulate loss of network connectivity. Note member 2 will increment its
- // election term to 3.
-
- member1Actor.underlyingActor().clear();
- member1Actor.underlyingActor().expectMessageClass(AppendEntries.class, 1);
-
- member2Actor.underlyingActor().clear();
- member2Actor.underlyingActor().expectMessageClass(RequestVoteReply.class, 1);
- member2Actor.underlyingActor().expectMessageClass(AppendEntriesReply.class, 1);
-
- member3Actor.underlyingActor().clear();
- member3Actor.underlyingActor().dropMessagesToBehavior(AppendEntries.class);
- member3Actor.underlyingActor().dropMessagesToBehavior(RequestVote.class);
-
- member2Actor.tell(new ElectionTimeout(), ActorRef.noSender());
-
- member2Actor.underlyingActor().waitForExpectedMessages(RequestVoteReply.class);
-
- requestVoteReply = member2Actor.underlyingActor().getCapturedMessage(RequestVoteReply.class);
- assertEquals("getTerm", member2Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
- assertEquals("isVoteGranted", true, requestVoteReply.isVoteGranted());
-
- member3Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
-
- member1Actor.underlyingActor().waitForExpectedMessages(AppendEntries.class);
- member3Actor.underlyingActor().waitForExpectedMessages(AppendEntries.class);
- member2Actor.underlyingActor().waitForExpectedMessages(AppendEntriesReply.class);
-
- // We end up with 2 partitioned leaders both leading member 1. The term for member 1 and 3
- // is 3 and member 3's term is 2.
-
- verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
- verifyBehaviorState("member 2", member2Actor, RaftState.Leader);
- verifyBehaviorState("member 3", member3Actor, RaftState.Leader);
-
- assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", 2, member3Context.getTermInformation().getCurrentTerm());
-
- // Re-establish connectivity between member 2 and 3, ie stop dropping messages between
- // the 2. Send heartbeats (AppendEntries) from member 3. Both member 1 and 2 should send back
- // an unsuccessful AppendEntriesReply b/c their term (3) is greater than member 3's term (2).
- // This should cause member 3 to switch to Follower.
-
- RaftActorBehavior savedMember1Behavior = member1Actor.underlyingActor().behavior;
- RaftActorBehavior savedMember2Behavior = member2Actor.underlyingActor().behavior;
- RaftActorBehavior savedMember3Behavior = member3Actor.underlyingActor().behavior;
- long savedMember3Term = member3Context.getTermInformation().getCurrentTerm();
- String savedMember3VoterFor = member3Context.getTermInformation().getVotedFor();
-
- member1Actor.underlyingActor().clear();
- member1Actor.underlyingActor().expectMessageClass(AppendEntries.class, 1);
-
- member2Actor.underlyingActor().clear();
- member2Actor.underlyingActor().expectMessageClass(AppendEntries.class, 1);
-
- member3Actor.underlyingActor().clear();
- member3Actor.underlyingActor().expectMessageClass(AppendEntriesReply.class, 1);
-
- sendHeartbeat(member3Actor);
-
- member3Actor.underlyingActor().waitForExpectedMessages(AppendEntriesReply.class);
-
- AppendEntriesReply appendEntriesReply = member3Actor.underlyingActor().
- getCapturedMessage(AppendEntriesReply.class);
- assertEquals("isSuccess", false, appendEntriesReply.isSuccess());
- assertEquals("getTerm", 3, appendEntriesReply.getTerm());
-
- verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
- verifyBehaviorState("member 2", member2Actor, RaftState.Leader);
- verifyBehaviorState("member 3", member3Actor, RaftState.Follower);
-
- assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", 3, member3Context.getTermInformation().getCurrentTerm());
-
- // Revert back to the partitioned leaders state to test the other sequence where member 2
- // sends heartbeats first before member 3. member 1 should return a successful
- // AppendEntriesReply b/c his term matches member 2's. member 3 should switch to Follower
- // as his term is less than member 2's.
-
- member1Actor.underlyingActor().behavior = savedMember1Behavior;
- member2Actor.underlyingActor().behavior = savedMember2Behavior;
- member3Actor.underlyingActor().behavior = savedMember3Behavior;
-
- member3Context.getTermInformation().update(savedMember3Term, savedMember3VoterFor);
-
- member1Actor.underlyingActor().clear();
- member1Actor.underlyingActor().expectMessageClass(AppendEntries.class, 1);
-
- member2Actor.underlyingActor().clear();
- member2Actor.underlyingActor().expectMessageClass(AppendEntriesReply.class, 1);
-
- member3Actor.underlyingActor().clear();
- member3Actor.underlyingActor().expectMessageClass(AppendEntries.class, 1);
-
- sendHeartbeat(member2Actor);
-
- member1Actor.underlyingActor().waitForExpectedMessages(AppendEntries.class);
- member3Actor.underlyingActor().waitForExpectedMessages(AppendEntries.class);
-
- member2Actor.underlyingActor().waitForExpectedMessages(AppendEntriesReply.class);
-
- verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
- verifyBehaviorState("member 2", member2Actor, RaftState.Leader);
- verifyBehaviorState("member 3", member3Actor, RaftState.Follower);
-
- assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", 3, member3Context.getTermInformation().getCurrentTerm());
-
- testLog.info("testPartitionedLeadersScenario done");
- }
-
- @Test
- public void testPartitionedCandidateOnStartupScenario() throws Exception {
- testLog.info("Starting testPartitionedCandidateOnStartupScenario");
-
- TestActorRef<MemberActor> member1Actor = newMemberActor("member1") ;
- TestActorRef<MemberActor> member2Actor = newMemberActor("member2");
- TestActorRef<MemberActor> member3Actor = newMemberActor("member3");
-
- // Create member 2's behavior as Follower.
-
- MockRaftActorContext member2Context = newRaftActorContext("member2", member2Actor,
- ImmutableMap.<String,String>builder().
- put("member1", member1Actor.path().toString()).
- put("member3", member3Actor.path().toString()).build());
-
- DefaultConfigParamsImpl member2ConfigParams = newConfigParams();
- member2Context.setConfigParams(member2ConfigParams);
-
- Follower member2Behavior = new Follower(member2Context);
- member2Actor.underlyingActor().behavior = member2Behavior;
-
- // Create member 1's behavior as Leader.
-
- MockRaftActorContext member1Context = newRaftActorContext("member1", member1Actor,
- ImmutableMap.<String,String>builder().
- put("member2", member2Actor.path().toString()).
- put("member3", member3Actor.path().toString()).build());
-
- DefaultConfigParamsImpl member1ConfigParams = newConfigParams();
- member1Context.setConfigParams(member1ConfigParams);
-
- initializeLeaderBehavior(member1Actor, member1Context, 1);
-
- member2Actor.underlyingActor().clear();
- member3Actor.underlyingActor().clear();
-
- // Initialize the ReplicatedLog and election term info for member 1 and 2. The current term
- // will be 3 and the last term will be 2.
-
- SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
- replicatedLog.append(new MockReplicatedLogEntry(2, 1, new MockPayload("")));
- replicatedLog.append(new MockReplicatedLogEntry(3, 1, new MockPayload("")));
-
- member1Context.setReplicatedLog(replicatedLog);
- member1Context.getTermInformation().update(3, "");
-
- member2Context.setReplicatedLog(replicatedLog);
- member2Context.getTermInformation().update(3, member1Context.getId());
-
- // Create member 3's behavior initially as a Candidate.
-
- MockRaftActorContext member3Context = newRaftActorContext("member3", member3Actor,
- ImmutableMap.<String,String>builder().
- put("member1", member1Actor.path().toString()).
- put("member2", member2Actor.path().toString()).build());
-
- DefaultConfigParamsImpl member3ConfigParams = newConfigParams();
- member3Context.setConfigParams(member3ConfigParams);
-
- // Initialize the ReplicatedLog and election term info for Candidate member 3. The current term
- // will be 2 and the last term will be 1 so it is behind the leader's log.
-
- SimpleReplicatedLog candidateReplicatedLog = new SimpleReplicatedLog();
- candidateReplicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("")));
- candidateReplicatedLog.append(new MockReplicatedLogEntry(2, 1, new MockPayload("")));
-
- member3Context.setReplicatedLog(candidateReplicatedLog);
- member3Context.getTermInformation().update(2, member1Context.getId());
-
- // The member 3 Candidate will start a new term and send RequestVotes. However it will be
- // partitioned from the cluster by having member 1 and 2 drop its RequestVote messages.
-
- int numCandidateElections = 5;
- long candidateElectionTerm = member3Context.getTermInformation().getCurrentTerm() + numCandidateElections;
-
- member1Actor.underlyingActor().dropMessagesToBehavior(RequestVote.class, numCandidateElections);
-
- member2Actor.underlyingActor().dropMessagesToBehavior(RequestVote.class, numCandidateElections);
-
- Candidate member3Behavior = new Candidate(member3Context);
- member3Actor.underlyingActor().behavior = member3Behavior;
-
- // Send several additional ElectionTimeouts to Candidate member 3. Each ElectionTimeout will
- // start a new term so Candidate member 3's current term will be greater than the leader's
- // current term.
-
- for(int i = 0; i < numCandidateElections - 1; i++) {
- member3Actor.tell(new ElectionTimeout(), ActorRef.noSender());
- }
-
- member1Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
- member2Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
-
- verifyBehaviorState("member 1", member1Actor, RaftState.Leader);
- verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
- verifyBehaviorState("member 3", member3Actor, RaftState.Candidate);
-
- assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", candidateElectionTerm,
- member3Context.getTermInformation().getCurrentTerm());
-
- // Now send a couple more ElectionTimeouts to Candidate member 3 with the partition resolved.
- //
- // On the first RequestVote, Leader member 1 should switch to Follower as its term (s) is less than
- // the RequestVote's term (8) from member 3. No RequestVoteReply should be sent by member 1.
- // Follower member 2 should update its term since it less than the RequestVote's term and
- // should return a RequestVoteReply but should not grant the vote as its last term and index
- // is greater than the RequestVote's lastLogTerm and lastLogIndex, ie member 2's log is later
- // or more up to date than member 3's.
- //
- // On the second RequestVote, both member 1 and 2 are followers so they should update their
- // term and return a RequestVoteReply but should not grant the vote.
-
- candidateElectionTerm += 2;
- for(int i = 0; i < 2; i++) {
- member1Actor.underlyingActor().clear();
- member1Actor.underlyingActor().expectMessageClass(RequestVote.class, 1);
- member2Actor.underlyingActor().clear();
- member2Actor.underlyingActor().expectMessageClass(RequestVote.class, 1);
- member3Actor.underlyingActor().clear();
- member3Actor.underlyingActor().expectMessageClass(RequestVoteReply.class, 1);
-
- member3Actor.tell(new ElectionTimeout(), ActorRef.noSender());
-
- member1Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
- member2Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
-
- member3Actor.underlyingActor().waitForExpectedMessages(RequestVoteReply.class);
-
- RequestVoteReply requestVoteReply = member3Actor.underlyingActor().getCapturedMessage(RequestVoteReply.class);
- assertEquals("getTerm", member3Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
- assertEquals("isVoteGranted", false, requestVoteReply.isVoteGranted());
- }
-
- verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
- verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
- verifyBehaviorState("member 3", member3Actor, RaftState.Candidate);
-
- // Even though member 3 didn't get voted for, member 1 and 2 should have updated their term
- // to member 3's.
-
- assertEquals("member 1 election term", candidateElectionTerm,
- member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", candidateElectionTerm,
- member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", candidateElectionTerm,
- member3Context.getTermInformation().getCurrentTerm());
-
- // At this point we have no leader. Candidate member 3 would continue to start new elections
- // but wouldn't be granted a vote. One of the 2 followers would eventually time out from
- // not having received a heartbeat from a leader and switch to candidate and start a new
- // election. We'll simulate that here by sending an ElectionTimeout to member 1.
-
- member1Actor.underlyingActor().clear();
- member1Actor.underlyingActor().expectMessageClass(RequestVoteReply.class, 1);
- member2Actor.underlyingActor().clear();
- member2Actor.underlyingActor().expectMessageClass(RequestVote.class, 1);
- member3Actor.underlyingActor().clear();
- member3Actor.underlyingActor().expectMessageClass(RequestVote.class, 1);
- member3Actor.underlyingActor().expectBehaviorStateChange();
-
- member1Actor.tell(new ElectionTimeout(), ActorRef.noSender());
-
- member2Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
- member3Actor.underlyingActor().waitForExpectedMessages(RequestVote.class);
-
- // The RequestVoteReply should come from Follower member 2 and the vote should be granted
- // since member 2's last term and index matches member 1's.
-
- member1Actor.underlyingActor().waitForExpectedMessages(RequestVoteReply.class);
-
- RequestVoteReply requestVoteReply = member1Actor.underlyingActor().getCapturedMessage(RequestVoteReply.class);
- assertEquals("getTerm", member1Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
- assertEquals("isVoteGranted", true, requestVoteReply.isVoteGranted());
-
- // Candidate member 3 should change to follower as its term should be less than the
- // RequestVote term (member 1 started a new term higher than the other member's terms).
-
- member3Actor.underlyingActor().waitForBehaviorStateChange();
-
- verifyBehaviorState("member 1", member1Actor, RaftState.Leader);
- verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
- verifyBehaviorState("member 3", member3Actor, RaftState.Follower);
-
- // newTerm should be 10.
-
- long newTerm = candidateElectionTerm + 1;
- assertEquals("member 1 election term", newTerm, member1Context.getTermInformation().getCurrentTerm());
- assertEquals("member 2 election term", newTerm, member2Context.getTermInformation().getCurrentTerm());
- assertEquals("member 3 election term", newTerm, member3Context.getTermInformation().getCurrentTerm());
-
- testLog.info("testPartitionedCandidateOnStartupScenario done");
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.SimpleReplicatedLog;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+
+/**
+ * A leader election scenario test that partitions a candidate when trying to join a cluster on startup.
+ *
+ * @author Thomas Pantelis
+ */
+public class PartitionedCandidateOnStartupElectionScenarioTest extends AbstractLeaderElectionScenarioTest {
+
+ private final int numCandidateElections = 5;
+ private long candidateElectionTerm;
+
+ @Test
+ public void runTest() throws Exception {
+ testLog.info("PartitionedCandidateOnStartupElectionScenarioTest starting");
+
+ setupInitialMember1AndMember2Behaviors();
+
+ setupPartitionedCandidateMember3AndSendElectionTimeouts();
+
+ resolvePartitionAndSendElectionTimeoutsToCandidateMember3();
+
+ sendElectionTimeoutToFollowerMember1();
+
+ testLog.info("PartitionedCandidateOnStartupElectionScenarioTest ending");
+ }
+
+ private void sendElectionTimeoutToFollowerMember1() throws Exception {
+ testLog.info("sendElectionTimeoutToFollowerMember1 starting");
+
+ // At this point we have no leader. Candidate member 3 would continue to start new elections
+ // but wouldn't be granted a vote. One of the 2 followers would eventually time out from
+ // not having received a heartbeat from a leader and switch to candidate and start a new
+ // election. We'll simulate that here by sending an ElectionTimeout to member 1.
+
+ member1Actor.clear();
+ member1Actor.expectMessageClass(RequestVoteReply.class, 1);
+ member2Actor.clear();
+ member2Actor.expectMessageClass(RequestVote.class, 1);
+ member3Actor.clear();
+ member3Actor.expectMessageClass(RequestVote.class, 1);
+ member3Actor.expectBehaviorStateChange();
+
+ member1ActorRef.tell(new ElectionTimeout(), ActorRef.noSender());
+
+ member2Actor.waitForExpectedMessages(RequestVote.class);
+ member3Actor.waitForExpectedMessages(RequestVote.class);
+
+ // The RequestVoteReply should come from Follower member 2 and the vote should be granted
+ // since member 2's last term and index matches member 1's.
+
+ member1Actor.waitForExpectedMessages(RequestVoteReply.class);
+
+ RequestVoteReply requestVoteReply = member1Actor.getCapturedMessage(RequestVoteReply.class);
+ assertEquals("getTerm", member1Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
+ assertEquals("isVoteGranted", true, requestVoteReply.isVoteGranted());
+
+ // Candidate member 3 should change to follower as its term should be less than the
+ // RequestVote term (member 1 started a new term higher than the other member's terms).
+
+ member3Actor.waitForBehaviorStateChange();
+
+ verifyBehaviorState("member 1", member1Actor, RaftState.Leader);
+ verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
+ verifyBehaviorState("member 3", member3Actor, RaftState.Follower);
+
+ // newTerm should be 10.
+
+ long newTerm = candidateElectionTerm + 1;
+ assertEquals("member 1 election term", newTerm, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", newTerm, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", newTerm, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("sendElectionTimeoutToFollowerMember1 ending");
+ }
+
+ private void resolvePartitionAndSendElectionTimeoutsToCandidateMember3() throws Exception {
+ testLog.info("resolvePartitionAndSendElectionTimeoutsToCandidateMember3 starting");
+
+ // Now send a couple more ElectionTimeouts to Candidate member 3 with the partition resolved.
+ //
+ // On the first RequestVote, Leader member 1 should switch to Follower as its term (s) is less than
+ // the RequestVote's term (8) from member 3. No RequestVoteReply should be sent by member 1.
+ // Follower member 2 should update its term since it less than the RequestVote's term and
+ // should return a RequestVoteReply but should not grant the vote as its last term and index
+ // is greater than the RequestVote's lastLogTerm and lastLogIndex, ie member 2's log is later
+ // or more up to date than member 3's.
+ //
+ // On the second RequestVote, both member 1 and 2 are followers so they should update their
+ // term and return a RequestVoteReply but should not grant the vote.
+
+ candidateElectionTerm += 2;
+ for(int i = 0; i < 2; i++) {
+ member1Actor.clear();
+ member1Actor.expectMessageClass(RequestVote.class, 1);
+ member2Actor.clear();
+ member2Actor.expectMessageClass(RequestVote.class, 1);
+ member3Actor.clear();
+ member3Actor.expectMessageClass(RequestVoteReply.class, 1);
+
+ member3ActorRef.tell(new ElectionTimeout(), ActorRef.noSender());
+
+ member1Actor.waitForExpectedMessages(RequestVote.class);
+ member2Actor.waitForExpectedMessages(RequestVote.class);
+
+ member3Actor.waitForExpectedMessages(RequestVoteReply.class);
+
+ RequestVoteReply requestVoteReply = member3Actor.getCapturedMessage(RequestVoteReply.class);
+ assertEquals("getTerm", member3Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
+ assertEquals("isVoteGranted", false, requestVoteReply.isVoteGranted());
+ }
+
+ verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
+ verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
+ verifyBehaviorState("member 3", member3Actor, RaftState.Candidate);
+
+ // Even though member 3 didn't get voted for, member 1 and 2 should have updated their term
+ // to member 3's.
+
+ assertEquals("member 1 election term", candidateElectionTerm,
+ member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", candidateElectionTerm,
+ member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", candidateElectionTerm,
+ member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("resolvePartitionAndSendElectionTimeoutsToCandidateMember3 ending");
+ }
+
+ private void setupPartitionedCandidateMember3AndSendElectionTimeouts() {
+ testLog.info("setupPartitionedCandidateMember3AndSendElectionTimeouts starting");
+
+ // Create member 3's behavior initially as a Candidate.
+
+ member3Context = newRaftActorContext("member3", member3ActorRef,
+ ImmutableMap.<String,String>builder().
+ put("member1", member1ActorRef.path().toString()).
+ put("member2", member2ActorRef.path().toString()).build());
+
+ DefaultConfigParamsImpl member3ConfigParams = newConfigParams();
+ member3Context.setConfigParams(member3ConfigParams);
+
+ // Initialize the ReplicatedLog and election term info for Candidate member 3. The current term
+ // will be 2 and the last term will be 1 so it is behind the leader's log.
+
+ SimpleReplicatedLog candidateReplicatedLog = new SimpleReplicatedLog();
+ candidateReplicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("")));
+ candidateReplicatedLog.append(new MockReplicatedLogEntry(2, 1, new MockPayload("")));
+
+ member3Context.setReplicatedLog(candidateReplicatedLog);
+ member3Context.getTermInformation().update(2, member1Context.getId());
+
+ // The member 3 Candidate will start a new term and send RequestVotes. However it will be
+ // partitioned from the cluster by having member 1 and 2 drop its RequestVote messages.
+
+ candidateElectionTerm = member3Context.getTermInformation().getCurrentTerm() + numCandidateElections;
+
+ member1Actor.dropMessagesToBehavior(RequestVote.class, numCandidateElections);
+
+ member2Actor.dropMessagesToBehavior(RequestVote.class, numCandidateElections);
+
+ Candidate member3Behavior = new Candidate(member3Context);
+ member3Actor.behavior = member3Behavior;
+
+ // Send several additional ElectionTimeouts to Candidate member 3. Each ElectionTimeout will
+ // start a new term so Candidate member 3's current term will be greater than the leader's
+ // current term.
+
+ for(int i = 0; i < numCandidateElections - 1; i++) {
+ member3ActorRef.tell(new ElectionTimeout(), ActorRef.noSender());
+ }
+
+ member1Actor.waitForExpectedMessages(RequestVote.class);
+ member2Actor.waitForExpectedMessages(RequestVote.class);
+
+ verifyBehaviorState("member 1", member1Actor, RaftState.Leader);
+ verifyBehaviorState("member 2", member2Actor, RaftState.Follower);
+ verifyBehaviorState("member 3", member3Actor, RaftState.Candidate);
+
+ assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", candidateElectionTerm,
+ member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("setupPartitionedCandidateMember3AndSendElectionTimeouts ending");
+ }
+
+ private void setupInitialMember1AndMember2Behaviors() throws Exception {
+ testLog.info("setupInitialMember1AndMember2Behaviors starting");
+
+ // Create member 2's behavior as Follower.
+
+ member2Context = newRaftActorContext("member2", member2ActorRef,
+ ImmutableMap.<String,String>builder().
+ put("member1", member1ActorRef.path().toString()).
+ put("member3", member3ActorRef.path().toString()).build());
+
+ DefaultConfigParamsImpl member2ConfigParams = newConfigParams();
+ member2Context.setConfigParams(member2ConfigParams);
+
+ Follower member2Behavior = new Follower(member2Context);
+ member2Actor.behavior = member2Behavior;
+
+ // Create member 1's behavior as Leader.
+
+ member1Context = newRaftActorContext("member1", member1ActorRef,
+ ImmutableMap.<String,String>builder().
+ put("member2", member2ActorRef.path().toString()).
+ put("member3", member3ActorRef.path().toString()).build());
+
+ DefaultConfigParamsImpl member1ConfigParams = newConfigParams();
+ member1Context.setConfigParams(member1ConfigParams);
+
+ initializeLeaderBehavior(member1Actor, member1Context, 1);
+
+ member2Actor.clear();
+ member3Actor.clear();
+
+ // Initialize the ReplicatedLog and election term info for member 1 and 2. The current term
+ // will be 3 and the last term will be 2.
+
+ SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
+ replicatedLog.append(new MockReplicatedLogEntry(2, 1, new MockPayload("")));
+ replicatedLog.append(new MockReplicatedLogEntry(3, 1, new MockPayload("")));
+
+ member1Context.setReplicatedLog(replicatedLog);
+ member1Context.getTermInformation().update(3, "");
+
+ member2Context.setReplicatedLog(replicatedLog);
+ member2Context.getTermInformation().update(3, member1Context.getId());
+
+ testLog.info("setupInitialMember1AndMember2Behaviors ending");
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+
+/**
+ * A leader election scenario test that causes partitioned leaders by dropping messages between 2 members.
+ *
+ * @author Thomas Pantelis
+ */
+public class PartitionedLeadersElectionScenarioTest extends AbstractLeaderElectionScenarioTest {
+
+ /**
+ * This test sets up a scenario with partitioned leaders member 2 and 3 where partitioned leader 3
+ * sends a heartbeat first when connectivity is re-established.
+ */
+ @Test
+ public void runTest1() throws Exception {
+ testLog.info("PartitionedLeadersElectionScenarioTest 1 starting");
+
+ setupInitialMemberBehaviors();
+
+ sendInitialElectionTimeoutToFollowerMember2();
+
+ sendInitialElectionTimeoutToFollowerMember3();
+
+ sendElectionTimeoutToNowCandidateMember2();
+
+ resolvePartitionedLeadersWithLeaderMember3SendingHeartbeatFirst();
+
+ testLog.info("PartitionedLeadersElectionScenarioTest 1 ending");
+ }
+
+ /**
+ * This test sets up a scenario with partitioned leaders member 2 and 3 where partitioned leader 2
+ * sends a heartbeat first when connectivity is re-established.
+ */
+ @Test
+ public void runTest2() throws Exception {
+ testLog.info("PartitionedLeadersElectionScenarioTest 2 starting");
+
+ setupInitialMemberBehaviors();
+
+ sendInitialElectionTimeoutToFollowerMember2();
+
+ sendInitialElectionTimeoutToFollowerMember3();
+
+ sendElectionTimeoutToNowCandidateMember2();
+
+ resolvePartitionedLeadersWithLeaderMember2SendingHeartbeatFirst();
+
+ testLog.info("PartitionedLeadersElectionScenarioTest 2 ending");
+ }
+
+ private void resolvePartitionedLeadersWithLeaderMember2SendingHeartbeatFirst() {
+ testLog.info("resolvePartitionedLeadersWithLeaderMember2SendingHeartbeatFirst starting");
+
+ // Re-establish connectivity between member 2 and 3, ie stop dropping messages between
+ // the 2. Send heartbeats (AppendEntries) from partitioned leader member 2. Follower member 1 should
+ // return a successful AppendEntriesReply b/c its term matches member 2's. member 3 should switch to
+ // Follower as its term is less than member 2's.
+
+ member1Actor.clear();
+ member1Actor.expectMessageClass(AppendEntries.class, 1);
+
+ member2Actor.clear();
+ member2Actor.expectMessageClass(AppendEntriesReply.class, 1);
+
+ member3Actor.clear();
+ member3Actor.expectMessageClass(AppendEntries.class, 1);
+
+ sendHeartbeat(member2ActorRef);
+
+ member1Actor.waitForExpectedMessages(AppendEntries.class);
+ member3Actor.waitForExpectedMessages(AppendEntries.class);
+
+ member2Actor.waitForExpectedMessages(AppendEntriesReply.class);
+
+ verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
+ verifyBehaviorState("member 2", member2Actor, RaftState.Leader);
+ verifyBehaviorState("member 3", member3Actor, RaftState.Follower);
+
+ assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", 3, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("resolvePartitionedLeadersWithLeaderMember2SendingHeartbeatFirst ending");
+ }
+
+ private void resolvePartitionedLeadersWithLeaderMember3SendingHeartbeatFirst() throws Exception {
+ testLog.info("resolvePartitionedLeadersWithLeaderMember3SendingHeartbeatFirst starting");
+
+ // Re-establish connectivity between member 2 and 3, ie stop dropping messages between
+ // the 2. Send heartbeats (AppendEntries) from now leader member 3. Both member 1 and 2 should send
+ // back an unsuccessful AppendEntriesReply b/c their term (3) is greater than member 3's term (2).
+ // This should cause member 3 to switch to Follower.
+
+ member1Actor.clear();
+ member1Actor.expectMessageClass(AppendEntries.class, 1);
+
+ member2Actor.clear();
+ member2Actor.expectMessageClass(AppendEntries.class, 1);
+
+ member3Actor.clear();
+ member3Actor.expectMessageClass(AppendEntriesReply.class, 1);
+
+ sendHeartbeat(member3ActorRef);
+
+ member3Actor.waitForExpectedMessages(AppendEntriesReply.class);
+
+ AppendEntriesReply appendEntriesReply = member3Actor.getCapturedMessage(AppendEntriesReply.class);
+ assertEquals("isSuccess", false, appendEntriesReply.isSuccess());
+ assertEquals("getTerm", 3, appendEntriesReply.getTerm());
+
+ verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
+ verifyBehaviorState("member 2", member2Actor, RaftState.Leader);
+ verifyBehaviorState("member 3", member3Actor, RaftState.Follower);
+
+ assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", 3, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("resolvePartitionedLeadersWithLeaderMember3SendingHeartbeatFirst ending");
+ }
+
+ private void sendElectionTimeoutToNowCandidateMember2() throws Exception {
+ testLog.info("sendElectionTimeoutToNowCandidateMember2 starting");
+
+ // member 2, now a candidate, is partitioned from the Leader (now member 3) and hasn't received any
+ // messages. It would get another ElectionTimeout so simulate that. member 1 should send back a reply
+ // granting the vote. Messages (RequestVote and AppendEntries) from member 2 to member 3
+ // are dropped to simulate loss of network connectivity. Note member 2 will increment its
+ // election term to 3.
+
+ member1Actor.clear();
+ member1Actor.expectMessageClass(AppendEntries.class, 1);
+
+ member2Actor.clear();
+ member2Actor.expectMessageClass(RequestVoteReply.class, 1);
+ member2Actor.expectMessageClass(AppendEntriesReply.class, 1);
+
+ member3Actor.clear();
+ member3Actor.dropMessagesToBehavior(AppendEntries.class);
+ member3Actor.dropMessagesToBehavior(RequestVote.class);
+
+ member2ActorRef.tell(new ElectionTimeout(), ActorRef.noSender());
+
+ member2Actor.waitForExpectedMessages(RequestVoteReply.class);
+
+ RequestVoteReply requestVoteReply = member2Actor.getCapturedMessage(RequestVoteReply.class);
+ assertEquals("getTerm", member2Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
+ assertEquals("isVoteGranted", true, requestVoteReply.isVoteGranted());
+
+ member3Actor.waitForExpectedMessages(RequestVote.class);
+
+ member1Actor.waitForExpectedMessages(AppendEntries.class);
+ member3Actor.waitForExpectedMessages(AppendEntries.class);
+ member2Actor.waitForExpectedMessages(AppendEntriesReply.class);
+
+ // We end up with 2 partitioned leaders both leading member 1. The term for member 1 and 3
+ // is 3 and member 3's term is 2.
+
+ verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
+ verifyBehaviorState("member 2", member2Actor, RaftState.Leader);
+ verifyBehaviorState("member 3", member3Actor, RaftState.Leader);
+
+ assertEquals("member 1 election term", 3, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 3, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", 2, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("sendElectionTimeoutToNowCandidateMember2 ending");
+ }
+
+ private void sendInitialElectionTimeoutToFollowerMember3() throws Exception {
+ testLog.info("sendInitialElectionTimeoutToFollowerMember3 starting");
+
+ // Send ElectionTimeout to member 3 to simulate no heartbeat from a Leader (originally member 1).
+ // member 3 should switch to Candidate and send out RequestVote messages. member 1, now a follower,
+ // should reply and grant the vote but member 2 will drop the message to simulate loss of network
+ // connectivity between members 2 and 3. member 3 should switch to leader.
+
+ member1Actor.clear();
+ member1Actor.expectMessageClass(RequestVote.class, 1);
+ member1Actor.expectMessageClass(AppendEntries.class, 1);
+
+ member2Actor.clear();
+ member2Actor.dropMessagesToBehavior(RequestVote.class);
+ member2Actor.dropMessagesToBehavior(AppendEntries.class);
+
+ member3Actor.clear();
+ member3Actor.expectMessageClass(RequestVoteReply.class, 1);
+ member3Actor.expectMessageClass(AppendEntriesReply.class, 1);
+
+ member3ActorRef.tell(new ElectionTimeout(), ActorRef.noSender());
+
+ member1Actor.waitForExpectedMessages(RequestVote.class);
+ member2Actor.waitForExpectedMessages(RequestVote.class);
+ member3Actor.waitForExpectedMessages(RequestVoteReply.class);
+
+ RequestVoteReply requestVoteReply = member3Actor.getCapturedMessage(RequestVoteReply.class);
+ assertEquals("getTerm", member3Context.getTermInformation().getCurrentTerm(), requestVoteReply.getTerm());
+ assertEquals("isVoteGranted", true, requestVoteReply.isVoteGranted());
+
+ // when member 3 switches to Leader it will immediately send out heartbeat AppendEntries to
+ // the followers. Wait for AppendEntries to member 1 and its AppendEntriesReply. The
+ // AppendEntries message to member 2 is dropped.
+
+ member1Actor.waitForExpectedMessages(AppendEntries.class);
+ member2Actor.waitForExpectedMessages(AppendEntries.class);
+ member3Actor.waitForExpectedMessages(AppendEntriesReply.class);
+
+ verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
+ verifyBehaviorState("member 2", member2Actor, RaftState.Candidate);
+ verifyBehaviorState("member 3", member3Actor, RaftState.Leader);
+
+ assertEquals("member 1 election term", 2, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 2, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", 2, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("sendInitialElectionTimeoutToFollowerMember3 ending");
+ }
+
+ private void sendInitialElectionTimeoutToFollowerMember2() {
+ testLog.info("sendInitialElectionTimeoutToFollowerMember2 starting");
+
+ // Send ElectionTimeout to member 2 to simulate no heartbeat from the Leader (member 1).
+ // member 2 should switch to Candidate, start new term 2 and send out RequestVote messages.
+ // member 1 will switch to Follower b/c its term is less than the member 2's RequestVote term, also it
+ // won't send back a reply. member 3 will drop the message (ie won't forward it to its behavior) to
+ // simulate loss of network connectivity between members 2 and 3.
+
+ member1Actor.expectMessageClass(RequestVote.class, 1);
+
+ member2Actor.expectBehaviorStateChange();
+
+ member3Actor.dropMessagesToBehavior(RequestVote.class);
+
+ member2ActorRef.tell(new ElectionTimeout(), ActorRef.noSender());
+
+ member1Actor.waitForExpectedMessages(RequestVote.class);
+ member3Actor.waitForExpectedMessages(RequestVote.class);
+
+ // Original leader member 1 should switch to Follower as the RequestVote term is greater than its
+ // term. It won't send back a RequestVoteReply in this case.
+
+ verifyBehaviorState("member 1", member1Actor, RaftState.Follower);
+
+ // member 2 should switch to Candidate since it didn't get a RequestVoteReply from the other 2 members.
+
+ member2Actor.waitForBehaviorStateChange();
+ verifyBehaviorState("member 2", member2Actor, RaftState.Candidate);
+
+ assertEquals("member 1 election term", 2, member1Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 2 election term", 2, member2Context.getTermInformation().getCurrentTerm());
+ assertEquals("member 3 election term", 1, member3Context.getTermInformation().getCurrentTerm());
+
+ testLog.info("sendInitialElectionTimeoutToFollowerMember2 ending");
+ }
+
+ private void setupInitialMemberBehaviors() throws Exception {
+ testLog.info("setupInitialMemberBehaviors starting");
+
+ // Create member 2's behavior initially as Follower
+
+ member2Context = newRaftActorContext("member2", member2ActorRef,
+ ImmutableMap.<String,String>builder().
+ put("member1", member1ActorRef.path().toString()).
+ put("member3", member3ActorRef.path().toString()).build());
+
+ DefaultConfigParamsImpl member2ConfigParams = newConfigParams();
+ member2Context.setConfigParams(member2ConfigParams);
+
+ Follower member2Behavior = new Follower(member2Context);
+ member2Actor.behavior = member2Behavior;
+
+ // Create member 3's behavior initially as Follower
+
+ member3Context = newRaftActorContext("member3", member3ActorRef,
+ ImmutableMap.<String,String>builder().
+ put("member1", member1ActorRef.path().toString()).
+ put("member2", member2ActorRef.path().toString()).build());
+
+ DefaultConfigParamsImpl member3ConfigParams = newConfigParams();
+ member3Context.setConfigParams(member3ConfigParams);
+
+ Follower member3Behavior = new Follower(member3Context);
+ member3Actor.behavior = member3Behavior;
+
+ // Create member 1's behavior initially as Leader
+
+ member1Context = newRaftActorContext("member1", member1ActorRef,
+ ImmutableMap.<String,String>builder().
+ put("member2", member2ActorRef.path().toString()).
+ put("member3", member3ActorRef.path().toString()).build());
+
+ DefaultConfigParamsImpl member1ConfigParams = newConfigParams();
+ member1Context.setConfigParams(member1ConfigParams);
+
+ initializeLeaderBehavior(member1Actor, member1Context, 2);
+
+ member2Actor.clear();
+ member3Actor.clear();
+
+ testLog.info("setupInitialMemberBehaviors ending");
+ }
+}