import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
-import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
import scala.concurrent.duration.FiniteDuration;
-public class LeaderTest extends AbstractLeaderTest {
+public class LeaderTest extends AbstractLeaderTest<Leader> {
static final String FOLLOWER_ID = "follower";
public static final String LEADER_ID = "leader";
leader = new Leader(createActorContext());
- // handle message should return the Leader state when it receives an
- // unknown message
- RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
- Assert.assertTrue(behavior instanceof Leader);
+ // handle message should null when it receives an unknown message
+ assertNull(leader.handleMessage(followerActor, "foo"));
}
@Test
logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setCommitIndex(-1);
short payloadVersion = (short)5;
actorContext.setPayloadVersion(payloadVersion);
actorContext.getTermInformation().update(term, "");
leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
// Leader should send an immediate heartbeat with no entries as follower is inactive.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setCommitIndex(-1);
// The raft context is initialized with a couple log entries. However the commitIndex
// is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
actorContext.getTermInformation().update(newTerm, "");
leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
// Leader will send an immediate heartbeat - ignore it.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// Wait slightly longer than heartbeat duration
Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
for(int i=0;i<3;i++) {
Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
}
List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
followerActor.underlyingActor().clear();
Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
sendReplicate(actorContext, lastIndex+1);
List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
actorContext.getReplicatedLog().append(newEntry);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new Replicate(leaderActor, "state-id", newEntry));
+ final Identifier id = new MockIdentifier("state-id");
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
// State should not change
assertTrue(raftBehavior instanceof Leader);
ApplyState last = applyStateList.get((int) newLogIndex - 1);
assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
- assertEquals("getIdentifier", "state-id", last.getIdentifier());
+ assertEquals("getIdentifier", id, last.getIdentifier());
}
@Test
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
- FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
//send first chunk and no InstallSnapshotReply received yet
Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
- AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
- AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+ AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
//InstallSnapshotReply received
fts.markSendStatus(true);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
- leaderActor, new Replicate(null, "state-id", entry));
+ leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
assertTrue(raftBehavior instanceof Leader);
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
- leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+ leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
assertEquals(2, cs.getLastTerm());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+ leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
- Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
@Test
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
- // if an initiate is started again when first is in progress, it shouldnt initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+ // if an initiate is started again when first is in progress, it should not initiate Capture
+ leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
- Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(0);
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
- FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
while(!fts.isLastChunk(fts.getChunkIndex())) {
fts.getNextChunk();
actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(0);
installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
- Assert.assertNull(installSnapshot);
+ assertNull(installSnapshot);
}
Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
+ assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
+ installSnapshot.getLastChunkHashCode().get().intValue());
- int hashCode = installSnapshot.getData().hashCode();
+ int hashCode = Arrays.hashCode(installSnapshot.getData());
followerActor.underlyingActor().clear();
ByteString bs = toByteString(leadersSnapshot);
byte[] barray = bs.toByteArray();
- FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
assertEquals(bs.size(), barray.length);
j = barray.length;
}
- ByteString chunk = fts.getNextChunk();
- assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+ byte[] chunk = fts.getNextChunk();
+ assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
fts.markSendStatus(true);
assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
}
- @Override protected RaftActorBehavior createBehavior(
- RaftActorContext actorContext) {
+ @Override
+ protected Leader createBehavior(final RaftActorContext actorContext) {
return new Leader(actorContext);
}
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
followerActorContext.setCommitIndex(1);
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
Map<String, String> leaderPeerAddresses = new HashMap<>();
leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
- assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
assertEquals("Log entries size", 2, appendEntries.getEntries().size());
assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
leader = new Leader(leaderActorContext);
assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
leader.handleMessage(followerActor, appendEntriesReply);
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
leader = new Leader(leaderActorContext);
assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
leader.handleMessage(followerActor, appendEntriesReply);
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
leader = new Leader(leaderActorContext);
assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
leader.handleMessage(followerActor, appendEntriesReply);
appendEntries = appendEntriesList.get(1);
assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
- assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
assertEquals("Log entries size", 2, appendEntries.getEntries().size());
assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
MockRaftActorContext leaderActorContext = createActorContext();
leader = new Leader(leaderActorContext);
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue(behavior instanceof Leader);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue(behavior instanceof Leader);
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckNoVotingFollowers() {
+ logStart("testIsolatedLeaderCheckNoVotingFollowers");
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
+
+ leader = new Leader(leaderActorContext);
+ leader.getFollower(FOLLOWER_ID).markFollowerActive();
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue("Expected Leader", behavior instanceof Leader);
}
private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
leader.markFollowerActive("follower-1");
leader.markFollowerActive("follower-2");
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue("Behavior not instance of Leader when all followers are active",
- behavior instanceof Leader);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
// kill 1 follower and verify if that got killed
final JavaTestKit probe = new JavaTestKit(getSystem());
leader.markFollowerInActive("follower-1");
leader.markFollowerActive("follower-2");
- behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
- behavior instanceof Leader);
+ behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
// kill 2nd follower and leader should change to Isolated leader
followerActor2.tell(PoisonPill.getInstance(), null);
assertEquals(termMsg2.getActor(), followerActor2);
leader.markFollowerInActive("follower-2");
- return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
}
@Test
RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
- Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
behavior instanceof IsolatedLeader);
}
RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
- Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
+ assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
behavior instanceof Leader);
}
new FiniteDuration(1000, TimeUnit.SECONDS));
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
String nonVotingFollowerId = "nonvoting-follower";
TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Ignore initial heartbeats
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Initial heartbeat
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
// Leader should force an election timeout
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
verify(mockTransferCohort).transferComplete();
}
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Initial heartbeat
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// Leader should force an election timeout
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
verify(mockTransferCohort).transferComplete();
}
new FiniteDuration(200, TimeUnit.MILLISECONDS));
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Initial heartbeat
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
// Leader should force an election timeout
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
verify(mockTransferCohort).transferComplete();
}
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Initial heartbeat
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
}
verify(mockTransferCohort).abortTransfer();
}
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());