+ @Test
+ public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+ logStart("testHandleOutOfSequenceInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+
+ InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
+ getNextChunk(bsSnapshot, 10, 50), 3, 3);
+ follower.handleMessage(leaderActor, installSnapshot);
+
+ InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ InstallSnapshotReply.class);
+
+ assertEquals("isSuccess", false, reply.isSuccess());
+ assertEquals("getChunkIndex", -1, reply.getChunkIndex());
+ assertEquals("getTerm", 1, reply.getTerm());
+ assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
+
+ assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
+ }
+
+ @Test
+ public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
+ MockRaftActorContext context = createActorContext();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ follower = createBehavior(context);
+
+ TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+
+ RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+ assertTrue("Expected Candidate", newBehavior instanceof Candidate);
+ }
+
+ @Test
+ public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
+ MockRaftActorContext context = createActorContext();
+ context.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getElectionTimeOutInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ context.setRaftPolicy(createRaftPolicy(false, false));
+
+ follower = createBehavior(context);
+
+ TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+ RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+ assertSame("handleMessage result", follower, newBehavior);
+ }
+
+ @Test
+ public void testFollowerSchedulesElectionIfNonVoting() {
+ MockRaftActorContext context = createActorContext();
+ context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
+ FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
+
+ follower = new Follower(context, "leader", (short)1);
+
+ ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
+ ElectionTimeout.class);
+ RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
+ assertSame("handleMessage result", follower, newBehavior);
+ assertNull("Expected null leaderId", follower.getLeaderId());
+ }
+
+ @Test
+ public void testElectionScheduledWhenAnyRaftRPCReceived() {
+ MockRaftActorContext context = createActorContext();
+ follower = createBehavior(context);
+ follower.handleMessage(leaderActor, new RaftRPC() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTerm() {
+ return 100;
+ }
+ });
+ verify(follower).scheduleElection(any(FiniteDuration.class));
+ }
+
+ @Test
+ public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
+ MockRaftActorContext context = createActorContext();
+ follower = createBehavior(context);
+ follower.handleMessage(leaderActor, "non-raft-rpc");
+ verify(follower, never()).scheduleElection(any(FiniteDuration.class));
+ }
+
+ public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
+ int snapshotLength = bs.size();
+ int start = offset;
+ int size = chunkSize;
+ if (chunkSize > snapshotLength) {
+ size = snapshotLength;
+ } else {
+ if (start + chunkSize > snapshotLength) {
+ size = snapshotLength - start;
+ }
+ }
+
+ byte[] nextChunk = new byte[size];
+ bs.copyTo(nextChunk, start, 0, size);
+ return nextChunk;
+ }
+
+ private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
+ String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
+ expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
+ }
+
+ private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
+ String expFollowerId, long expLogLastTerm, long expLogLastIndex,
+ boolean expForceInstallSnapshot) {
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ AppendEntriesReply.class);
+
+ assertEquals("isSuccess", expSuccess, reply.isSuccess());
+ assertEquals("getTerm", expTerm, reply.getTerm());
+ assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
+ assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
+ assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
+ assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
+ assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
+ }
+
+
+ private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
+ return new SimpleReplicatedLogEntry(index, term,
+ new MockRaftActorContext.MockPayload(data));
+ }
+
+ private ByteString createSnapshot() {
+ HashMap<String, String> followerSnapshot = new HashMap<>();
+ followerSnapshot.put("1", "A");
+ followerSnapshot.put("2", "B");
+ followerSnapshot.put("3", "C");
+
+ return toByteString(followerSnapshot);
+ }
+
+ @Override
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
+ ActorRef actorRef, RaftRPC rpc) throws Exception {
+ super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
+
+ String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
+ assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
+ }
+
+ @Override
+ protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
+ throws Exception {
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
+ assertEquals("isSuccess", true, reply.isSuccess());
+ }