2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.Props;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Stopwatch;
24 import com.google.protobuf.ByteString;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
34 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftActorContext;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.Snapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
40 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import scala.concurrent.duration.FiniteDuration;
51 public class FollowerTest extends AbstractRaftActorBehaviorTest {
53 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
54 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
56 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
57 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
59 private RaftActorBehavior follower;
61 private final short payloadVersion = 5;
65 public void tearDown() throws Exception {
66 if(follower != null) {
74 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
75 return new TestFollower(actorContext);
79 protected MockRaftActorContext createActorContext() {
80 return createActorContext(followerActor);
84 protected MockRaftActorContext createActorContext(ActorRef actorRef){
85 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
86 context.setPayloadVersion(payloadVersion );
90 private static int getElectionTimeoutCount(RaftActorBehavior follower){
91 if(follower instanceof TestFollower){
92 return ((TestFollower) follower).getElectionTimeoutCount();
98 public void testThatAnElectionTimeoutIsTriggered(){
99 MockRaftActorContext actorContext = createActorContext();
100 follower = new Follower(actorContext);
102 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
103 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
107 public void testHandleElectionTimeout(){
108 logStart("testHandleElectionTimeout");
110 follower = new Follower(createActorContext());
112 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
114 assertTrue(raftBehavior instanceof Candidate);
118 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
119 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
121 RaftActorContext context = createActorContext();
123 context.getTermInformation().update(term, null);
125 follower = createBehavior(context);
127 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
129 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
131 assertEquals("isVoteGranted", true, reply.isVoteGranted());
132 assertEquals("getTerm", term, reply.getTerm());
133 assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
137 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
138 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
140 RaftActorContext context = createActorContext();
142 context.getTermInformation().update(term, "test");
144 follower = createBehavior(context);
146 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
148 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
150 assertEquals("isVoteGranted", false, reply.isVoteGranted());
151 assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
156 public void testHandleFirstAppendEntries() throws Exception {
157 logStart("testHandleFirstAppendEntries");
159 MockRaftActorContext context = createActorContext();
160 context.getReplicatedLog().clear(0,2);
161 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
162 context.getReplicatedLog().setSnapshotIndex(99);
164 List<ReplicatedLogEntry> entries = Arrays.asList(
165 newReplicatedLogEntry(2, 101, "foo"));
167 Assert.assertEquals(1, context.getReplicatedLog().size());
169 // The new commitIndex is 101
170 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
172 follower = createBehavior(context);
173 follower.handleMessage(leaderActor, appendEntries);
175 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
176 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
178 assertFalse(syncStatus.isInitialSyncDone());
179 assertTrue("append entries reply should be true", reply.isSuccess());
183 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
184 logStart("testHandleFirstAppendEntries");
186 MockRaftActorContext context = createActorContext();
188 List<ReplicatedLogEntry> entries = Arrays.asList(
189 newReplicatedLogEntry(2, 101, "foo"));
191 // The new commitIndex is 101
192 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
194 follower = createBehavior(context);
195 follower.handleMessage(leaderActor, appendEntries);
197 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
198 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
200 assertFalse(syncStatus.isInitialSyncDone());
201 assertFalse("append entries reply should be false", reply.isSuccess());
205 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
206 logStart("testHandleFirstAppendEntries");
208 MockRaftActorContext context = createActorContext();
209 context.getReplicatedLog().clear(0,2);
210 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
211 context.getReplicatedLog().setSnapshotIndex(99);
213 List<ReplicatedLogEntry> entries = Arrays.asList(
214 newReplicatedLogEntry(2, 101, "foo"));
216 // The new commitIndex is 101
217 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
219 follower = createBehavior(context);
220 follower.handleMessage(leaderActor, appendEntries);
222 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
223 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
225 assertFalse(syncStatus.isInitialSyncDone());
226 assertTrue("append entries reply should be true", reply.isSuccess());
230 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
231 logStart("testHandleFirstAppendEntries");
233 MockRaftActorContext context = createActorContext();
234 context.getReplicatedLog().clear(0,2);
235 context.getReplicatedLog().setSnapshotIndex(100);
237 List<ReplicatedLogEntry> entries = Arrays.asList(
238 newReplicatedLogEntry(2, 101, "foo"));
240 // The new commitIndex is 101
241 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
243 follower = createBehavior(context);
244 follower.handleMessage(leaderActor, appendEntries);
246 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
247 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
249 assertFalse(syncStatus.isInitialSyncDone());
250 assertTrue("append entries reply should be true", reply.isSuccess());
254 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
255 logStart("testHandleFirstAppendEntries");
257 MockRaftActorContext context = createActorContext();
258 context.getReplicatedLog().clear(0,2);
259 context.getReplicatedLog().setSnapshotIndex(100);
261 List<ReplicatedLogEntry> entries = Arrays.asList(
262 newReplicatedLogEntry(2, 105, "foo"));
264 // The new commitIndex is 101
265 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
267 follower = createBehavior(context);
268 follower.handleMessage(leaderActor, appendEntries);
270 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
271 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
273 assertFalse(syncStatus.isInitialSyncDone());
274 assertFalse("append entries reply should be false", reply.isSuccess());
278 public void testHandleSyncUpAppendEntries() throws Exception {
279 logStart("testHandleSyncUpAppendEntries");
281 MockRaftActorContext context = createActorContext();
283 List<ReplicatedLogEntry> entries = Arrays.asList(
284 newReplicatedLogEntry(2, 101, "foo"));
286 // The new commitIndex is 101
287 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
289 follower = createBehavior(context);
290 follower.handleMessage(leaderActor, appendEntries);
292 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
294 assertFalse(syncStatus.isInitialSyncDone());
296 // Clear all the messages
297 followerActor.underlyingActor().clear();
299 context.setLastApplied(101);
300 context.setCommitIndex(101);
301 setLastLogEntry(context, 1, 101,
302 new MockRaftActorContext.MockPayload(""));
304 entries = Arrays.asList(
305 newReplicatedLogEntry(2, 101, "foo"));
307 // The new commitIndex is 101
308 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
309 follower.handleMessage(leaderActor, appendEntries);
311 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
313 assertTrue(syncStatus.isInitialSyncDone());
315 followerActor.underlyingActor().clear();
317 // Sending the same message again should not generate another message
319 follower.handleMessage(leaderActor, appendEntries);
321 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
323 assertNull(syncStatus);
328 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
329 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
331 MockRaftActorContext context = createActorContext();
333 List<ReplicatedLogEntry> entries = Arrays.asList(
334 newReplicatedLogEntry(2, 101, "foo"));
336 // The new commitIndex is 101
337 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
339 follower = createBehavior(context);
340 follower.handleMessage(leaderActor, appendEntries);
342 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
344 assertFalse(syncStatus.isInitialSyncDone());
346 // Clear all the messages
347 followerActor.underlyingActor().clear();
349 context.setLastApplied(100);
350 setLastLogEntry(context, 1, 100,
351 new MockRaftActorContext.MockPayload(""));
353 entries = Arrays.asList(
354 newReplicatedLogEntry(2, 101, "foo"));
356 // leader-2 is becoming the leader now and it says the commitIndex is 45
357 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
358 follower.handleMessage(leaderActor, appendEntries);
360 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
362 // We get a new message saying initial status is not done
363 assertFalse(syncStatus.isInitialSyncDone());
369 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
370 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
372 MockRaftActorContext context = createActorContext();
374 List<ReplicatedLogEntry> entries = Arrays.asList(
375 newReplicatedLogEntry(2, 101, "foo"));
377 // The new commitIndex is 101
378 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
380 follower = createBehavior(context);
381 follower.handleMessage(leaderActor, appendEntries);
383 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
385 assertFalse(syncStatus.isInitialSyncDone());
387 // Clear all the messages
388 followerActor.underlyingActor().clear();
390 context.setLastApplied(101);
391 context.setCommitIndex(101);
392 setLastLogEntry(context, 1, 101,
393 new MockRaftActorContext.MockPayload(""));
395 entries = Arrays.asList(
396 newReplicatedLogEntry(2, 101, "foo"));
398 // The new commitIndex is 101
399 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
400 follower.handleMessage(leaderActor, appendEntries);
402 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
404 assertTrue(syncStatus.isInitialSyncDone());
406 // Clear all the messages
407 followerActor.underlyingActor().clear();
409 context.setLastApplied(100);
410 setLastLogEntry(context, 1, 100,
411 new MockRaftActorContext.MockPayload(""));
413 entries = Arrays.asList(
414 newReplicatedLogEntry(2, 101, "foo"));
416 // leader-2 is becoming the leader now and it says the commitIndex is 45
417 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
418 follower.handleMessage(leaderActor, appendEntries);
420 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
422 // We get a new message saying initial status is not done
423 assertFalse(syncStatus.isInitialSyncDone());
429 * This test verifies that when an AppendEntries RPC is received by a RaftActor
430 * with a commitIndex that is greater than what has been applied to the
431 * state machine of the RaftActor, the RaftActor applies the state and
432 * sets it current applied state to the commitIndex of the sender.
437 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
438 logStart("testHandleAppendEntriesWithNewerCommitIndex");
440 MockRaftActorContext context = createActorContext();
442 context.setLastApplied(100);
443 setLastLogEntry(context, 1, 100,
444 new MockRaftActorContext.MockPayload(""));
445 context.getReplicatedLog().setSnapshotIndex(99);
447 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
448 newReplicatedLogEntry(2, 101, "foo"));
450 // The new commitIndex is 101
451 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
453 follower = createBehavior(context);
454 follower.handleMessage(leaderActor, appendEntries);
456 assertEquals("getLastApplied", 101L, context.getLastApplied());
460 * This test verifies that when an AppendEntries is received a specific prevLogTerm
461 * which does not match the term that is in RaftActors log entry at prevLogIndex
462 * then the RaftActor does not change it's state and it returns a failure.
467 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
468 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
470 MockRaftActorContext context = createActorContext();
472 // First set the receivers term to lower number
473 context.getTermInformation().update(95, "test");
475 // AppendEntries is now sent with a bigger term
476 // this will set the receivers term to be the same as the sender's term
477 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
479 follower = createBehavior(context);
481 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
483 Assert.assertSame(follower, newBehavior);
485 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
486 AppendEntriesReply.class);
488 assertEquals("isSuccess", false, reply.isSuccess());
492 * This test verifies that when a new AppendEntries message is received with
493 * new entries and the logs of the sender and receiver match that the new
494 * entries get added to the log and the log is incremented by the number of
495 * entries received in appendEntries
500 public void testHandleAppendEntriesAddNewEntries() {
501 logStart("testHandleAppendEntriesAddNewEntries");
503 MockRaftActorContext context = createActorContext();
505 // First set the receivers term to lower number
506 context.getTermInformation().update(1, "test");
508 // Prepare the receivers log
509 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
510 log.append(newReplicatedLogEntry(1, 0, "zero"));
511 log.append(newReplicatedLogEntry(1, 1, "one"));
512 log.append(newReplicatedLogEntry(1, 2, "two"));
514 context.setReplicatedLog(log);
516 // Prepare the entries to be sent with AppendEntries
517 List<ReplicatedLogEntry> entries = new ArrayList<>();
518 entries.add(newReplicatedLogEntry(1, 3, "three"));
519 entries.add(newReplicatedLogEntry(1, 4, "four"));
521 // Send appendEntries with the same term as was set on the receiver
522 // before the new behavior was created (1 in this case)
523 // This will not work for a Candidate because as soon as a Candidate
524 // is created it increments the term
525 short leaderPayloadVersion = 10;
526 String leaderId = "leader-1";
527 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
529 follower = createBehavior(context);
531 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
533 Assert.assertSame(follower, newBehavior);
535 assertEquals("Next index", 5, log.last().getIndex() + 1);
536 assertEquals("Entry 3", entries.get(0), log.get(3));
537 assertEquals("Entry 4", entries.get(1), log.get(4));
539 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
540 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
542 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
546 * This test verifies that when a new AppendEntries message is received with
547 * new entries and the logs of the sender and receiver are out-of-sync that
548 * the log is first corrected by removing the out of sync entries from the
549 * log and then adding in the new entries sent with the AppendEntries message
552 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
553 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
555 MockRaftActorContext context = createActorContext();
557 // First set the receivers term to lower number
558 context.getTermInformation().update(1, "test");
560 // Prepare the receivers log
561 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
562 log.append(newReplicatedLogEntry(1, 0, "zero"));
563 log.append(newReplicatedLogEntry(1, 1, "one"));
564 log.append(newReplicatedLogEntry(1, 2, "two"));
566 context.setReplicatedLog(log);
568 // Prepare the entries to be sent with AppendEntries
569 List<ReplicatedLogEntry> entries = new ArrayList<>();
570 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
571 entries.add(newReplicatedLogEntry(2, 3, "three"));
573 // Send appendEntries with the same term as was set on the receiver
574 // before the new behavior was created (1 in this case)
575 // This will not work for a Candidate because as soon as a Candidate
576 // is created it increments the term
577 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
579 follower = createBehavior(context);
581 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
583 Assert.assertSame(follower, newBehavior);
585 // The entry at index 2 will be found out-of-sync with the leader
586 // and will be removed
587 // Then the two new entries will be added to the log
588 // Thus making the log to have 4 entries
589 assertEquals("Next index", 4, log.last().getIndex() + 1);
590 //assertEquals("Entry 2", entries.get(0), log.get(2));
592 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
594 // Check that the entry at index 2 has the new data
595 assertEquals("Entry 2", entries.get(0), log.get(2));
597 assertEquals("Entry 3", entries.get(1), log.get(3));
599 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
603 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
604 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
606 MockRaftActorContext context = createActorContext();
608 // First set the receivers term to lower number
609 context.getTermInformation().update(1, "test");
611 // Prepare the receivers log
612 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
613 log.append(newReplicatedLogEntry(1, 0, "zero"));
614 log.append(newReplicatedLogEntry(1, 1, "one"));
615 log.append(newReplicatedLogEntry(1, 2, "two"));
617 context.setReplicatedLog(log);
619 // Prepare the entries to be sent with AppendEntries
620 List<ReplicatedLogEntry> entries = new ArrayList<>();
621 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
622 entries.add(newReplicatedLogEntry(2, 3, "three"));
624 // Send appendEntries with the same term as was set on the receiver
625 // before the new behavior was created (1 in this case)
626 // This will not work for a Candidate because as soon as a Candidate
627 // is created it increments the term
628 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
630 context.setRaftPolicy(createRaftPolicy(false, true));
631 follower = createBehavior(context);
633 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
635 Assert.assertSame(follower, newBehavior);
637 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
641 public void testHandleAppendEntriesPreviousLogEntryMissing(){
642 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
644 MockRaftActorContext context = createActorContext();
646 // Prepare the receivers log
647 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
648 log.append(newReplicatedLogEntry(1, 0, "zero"));
649 log.append(newReplicatedLogEntry(1, 1, "one"));
650 log.append(newReplicatedLogEntry(1, 2, "two"));
652 context.setReplicatedLog(log);
654 // Prepare the entries to be sent with AppendEntries
655 List<ReplicatedLogEntry> entries = new ArrayList<>();
656 entries.add(newReplicatedLogEntry(1, 4, "four"));
658 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
660 follower = createBehavior(context);
662 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
664 Assert.assertSame(follower, newBehavior);
666 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
670 public void testHandleAppendEntriesWithExistingLogEntry() {
671 logStart("testHandleAppendEntriesWithExistingLogEntry");
673 MockRaftActorContext context = createActorContext();
675 context.getTermInformation().update(1, "test");
677 // Prepare the receivers log
678 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
679 log.append(newReplicatedLogEntry(1, 0, "zero"));
680 log.append(newReplicatedLogEntry(1, 1, "one"));
682 context.setReplicatedLog(log);
684 // Send the last entry again.
685 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
687 follower = createBehavior(context);
689 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
691 assertEquals("Next index", 2, log.last().getIndex() + 1);
692 assertEquals("Entry 1", entries.get(0), log.get(1));
694 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
696 // Send the last entry again and also a new one.
698 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
700 leaderActor.underlyingActor().clear();
701 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
703 assertEquals("Next index", 3, log.last().getIndex() + 1);
704 assertEquals("Entry 1", entries.get(0), log.get(1));
705 assertEquals("Entry 2", entries.get(1), log.get(2));
707 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
711 public void testHandleAppendEntriesAfterInstallingSnapshot(){
712 logStart("testHandleAppendAfterInstallingSnapshot");
714 MockRaftActorContext context = createActorContext();
716 // Prepare the receivers log
717 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
719 // Set up a log as if it has been snapshotted
720 log.setSnapshotIndex(3);
721 log.setSnapshotTerm(1);
723 context.setReplicatedLog(log);
725 // Prepare the entries to be sent with AppendEntries
726 List<ReplicatedLogEntry> entries = new ArrayList<>();
727 entries.add(newReplicatedLogEntry(1, 4, "four"));
729 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
731 follower = createBehavior(context);
733 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
735 Assert.assertSame(follower, newBehavior);
737 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
742 * This test verifies that when InstallSnapshot is received by
743 * the follower its applied correctly.
748 public void testHandleInstallSnapshot() throws Exception {
749 logStart("testHandleInstallSnapshot");
751 MockRaftActorContext context = createActorContext();
752 context.getTermInformation().update(1, "leader");
754 follower = createBehavior(context);
756 ByteString bsSnapshot = createSnapshot();
758 int snapshotLength = bsSnapshot.size();
760 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
761 int lastIncludedIndex = 1;
763 InstallSnapshot lastInstallSnapshot = null;
765 for(int i = 0; i < totalChunks; i++) {
766 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
767 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
768 chunkData, chunkIndex, totalChunks);
769 follower.handleMessage(leaderActor, lastInstallSnapshot);
770 offset = offset + 50;
775 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
776 ApplySnapshot.class);
777 Snapshot snapshot = applySnapshot.getSnapshot();
778 assertNotNull(lastInstallSnapshot);
779 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
780 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
781 snapshot.getLastAppliedTerm());
782 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
783 snapshot.getLastAppliedIndex());
784 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
785 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
786 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
787 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
788 applySnapshot.getCallback().onSuccess();
790 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
791 leaderActor, InstallSnapshotReply.class);
792 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
795 for(InstallSnapshotReply reply: replies) {
796 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
797 assertEquals("getTerm", 1, reply.getTerm());
798 assertEquals("isSuccess", true, reply.isSuccess());
799 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
802 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
807 * Verify that when an AppendEntries is sent to a follower during a snapshot install
808 * the Follower short-circuits the processing of the AppendEntries message.
813 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
814 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
816 MockRaftActorContext context = createActorContext();
818 follower = createBehavior(context);
820 ByteString bsSnapshot = createSnapshot();
821 int snapshotLength = bsSnapshot.size();
823 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
824 int lastIncludedIndex = 1;
826 // Check that snapshot installation is not in progress
827 assertNull(((Follower) follower).getSnapshotTracker());
829 // Make sure that we have more than 1 chunk to send
830 assertTrue(totalChunks > 1);
832 // Send an install snapshot with the first chunk to start the process of installing a snapshot
833 ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
834 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
835 chunkData, 1, totalChunks));
837 // Check if snapshot installation is in progress now
838 assertNotNull(((Follower) follower).getSnapshotTracker());
840 // Send an append entry
841 AppendEntries appendEntries = mock(AppendEntries.class);
842 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
844 follower.handleMessage(leaderActor, appendEntries);
846 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
847 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
848 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
849 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
851 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
852 verify(appendEntries, never()).getPrevLogIndex();
857 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
858 logStart("testInitialSyncUpWithHandleInstallSnapshot");
860 MockRaftActorContext context = createActorContext();
862 follower = createBehavior(context);
864 ByteString bsSnapshot = createSnapshot();
866 int snapshotLength = bsSnapshot.size();
868 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
869 int lastIncludedIndex = 1;
871 InstallSnapshot lastInstallSnapshot = null;
873 for(int i = 0; i < totalChunks; i++) {
874 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
875 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
876 chunkData, chunkIndex, totalChunks);
877 follower.handleMessage(leaderActor, lastInstallSnapshot);
878 offset = offset + 50;
883 FollowerInitialSyncUpStatus syncStatus =
884 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
886 assertFalse(syncStatus.isInitialSyncDone());
888 // Clear all the messages
889 followerActor.underlyingActor().clear();
891 context.setLastApplied(101);
892 context.setCommitIndex(101);
893 setLastLogEntry(context, 1, 101,
894 new MockRaftActorContext.MockPayload(""));
896 List<ReplicatedLogEntry> entries = Arrays.asList(
897 newReplicatedLogEntry(2, 101, "foo"));
899 // The new commitIndex is 101
900 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
901 follower.handleMessage(leaderActor, appendEntries);
903 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
905 assertTrue(syncStatus.isInitialSyncDone());
909 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
910 logStart("testHandleOutOfSequenceInstallSnapshot");
912 MockRaftActorContext context = createActorContext();
914 follower = createBehavior(context);
916 ByteString bsSnapshot = createSnapshot();
918 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
919 getNextChunk(bsSnapshot, 10, 50), 3, 3);
920 follower.handleMessage(leaderActor, installSnapshot);
922 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
923 InstallSnapshotReply.class);
925 assertEquals("isSuccess", false, reply.isSuccess());
926 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
927 assertEquals("getTerm", 1, reply.getTerm());
928 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
930 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
934 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
935 MockRaftActorContext context = createActorContext();
937 Stopwatch stopwatch = Stopwatch.createStarted();
939 follower = createBehavior(context);
941 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
943 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
945 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
949 public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
950 MockRaftActorContext context = createActorContext();
951 context.setConfigParams(new DefaultConfigParamsImpl(){
953 public FiniteDuration getElectionTimeOutInterval() {
954 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
958 context.setRaftPolicy(createRaftPolicy(false, false));
960 follower = createBehavior(context);
962 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
966 public void testElectionScheduledWhenAnyRaftRPCReceived(){
967 MockRaftActorContext context = createActorContext();
968 follower = createBehavior(context);
969 follower.handleMessage(leaderActor, new RaftRPC() {
971 public long getTerm() {
975 assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
979 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
980 MockRaftActorContext context = createActorContext();
981 follower = createBehavior(context);
982 follower.handleMessage(leaderActor, "non-raft-rpc");
983 assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
986 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
987 int snapshotLength = bs.size();
989 int size = chunkSize;
990 if (chunkSize > snapshotLength) {
991 size = snapshotLength;
993 if ((start + chunkSize) > snapshotLength) {
994 size = snapshotLength - start;
997 return bs.substring(start, start + size);
1000 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1001 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1002 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1005 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1006 String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1007 boolean expForceInstallSnapshot) {
1009 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1010 AppendEntriesReply.class);
1012 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1013 assertEquals("getTerm", expTerm, reply.getTerm());
1014 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1015 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1016 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1017 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1018 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1022 private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1023 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1024 new MockRaftActorContext.MockPayload(data));
1027 private ByteString createSnapshot(){
1028 HashMap<String, String> followerSnapshot = new HashMap<>();
1029 followerSnapshot.put("1", "A");
1030 followerSnapshot.put("2", "B");
1031 followerSnapshot.put("3", "C");
1033 return toByteString(followerSnapshot);
1037 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1038 ActorRef actorRef, RaftRPC rpc) throws Exception {
1039 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1041 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
1042 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1046 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
1048 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1049 assertEquals("isSuccess", true, reply.isSuccess());
1052 private static class TestFollower extends Follower {
1054 int electionTimeoutCount = 0;
1056 public TestFollower(RaftActorContext context) {
1061 protected void scheduleElection(FiniteDuration interval) {
1062 electionTimeoutCount++;
1063 super.scheduleElection(interval);
1066 public int getElectionTimeoutCount() {
1067 return electionTimeoutCount;