1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.Snapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
32 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
37 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
39 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import scala.concurrent.duration.FiniteDuration;
43 public class FollowerTest extends AbstractRaftActorBehaviorTest {
45 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
46 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
48 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
49 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
51 private RaftActorBehavior follower;
53 private final short payloadVersion = 5;
57 public void tearDown() throws Exception {
58 if(follower != null) {
66 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
67 return new TestFollower(actorContext);
71 protected MockRaftActorContext createActorContext() {
72 return createActorContext(followerActor);
76 protected MockRaftActorContext createActorContext(ActorRef actorRef){
77 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
78 context.setPayloadVersion(payloadVersion );
82 private int getElectionTimeoutCount(RaftActorBehavior follower){
83 if(follower instanceof TestFollower){
84 return ((TestFollower) follower).getElectionTimeoutCount();
90 public void testThatAnElectionTimeoutIsTriggered(){
91 MockRaftActorContext actorContext = createActorContext();
92 follower = new Follower(actorContext);
94 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
95 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
99 public void testHandleElectionTimeout(){
100 logStart("testHandleElectionTimeout");
102 follower = new Follower(createActorContext());
104 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
106 assertTrue(raftBehavior instanceof Candidate);
110 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
111 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
113 RaftActorContext context = createActorContext();
115 context.getTermInformation().update(term, null);
117 follower = createBehavior(context);
119 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
121 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
123 assertEquals("isVoteGranted", true, reply.isVoteGranted());
124 assertEquals("getTerm", term, reply.getTerm());
125 assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
129 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
130 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
132 RaftActorContext context = createActorContext();
134 context.getTermInformation().update(term, "test");
136 follower = createBehavior(context);
138 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
140 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
142 assertEquals("isVoteGranted", false, reply.isVoteGranted());
143 assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
148 public void testHandleFirstAppendEntries() throws Exception {
149 logStart("testHandleFirstAppendEntries");
151 MockRaftActorContext context = createActorContext();
152 context.getReplicatedLog().clear(0,2);
153 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
154 context.getReplicatedLog().setSnapshotIndex(99);
156 List<ReplicatedLogEntry> entries = Arrays.asList(
157 newReplicatedLogEntry(2, 101, "foo"));
159 Assert.assertEquals(1, context.getReplicatedLog().size());
161 // The new commitIndex is 101
162 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
164 follower = createBehavior(context);
165 follower.handleMessage(leaderActor, appendEntries);
167 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
168 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
170 assertFalse(syncStatus.isInitialSyncDone());
171 assertTrue("append entries reply should be true", reply.isSuccess());
175 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
176 logStart("testHandleFirstAppendEntries");
178 MockRaftActorContext context = createActorContext();
180 List<ReplicatedLogEntry> entries = Arrays.asList(
181 newReplicatedLogEntry(2, 101, "foo"));
183 // The new commitIndex is 101
184 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
186 follower = createBehavior(context);
187 follower.handleMessage(leaderActor, appendEntries);
189 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
190 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
192 assertFalse(syncStatus.isInitialSyncDone());
193 assertFalse("append entries reply should be false", reply.isSuccess());
197 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
198 logStart("testHandleFirstAppendEntries");
200 MockRaftActorContext context = createActorContext();
201 context.getReplicatedLog().clear(0,2);
202 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
203 context.getReplicatedLog().setSnapshotIndex(99);
205 List<ReplicatedLogEntry> entries = Arrays.asList(
206 newReplicatedLogEntry(2, 101, "foo"));
208 // The new commitIndex is 101
209 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
211 follower = createBehavior(context);
212 follower.handleMessage(leaderActor, appendEntries);
214 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
215 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
217 assertFalse(syncStatus.isInitialSyncDone());
218 assertTrue("append entries reply should be true", reply.isSuccess());
222 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
223 logStart("testHandleFirstAppendEntries");
225 MockRaftActorContext context = createActorContext();
226 context.getReplicatedLog().clear(0,2);
227 context.getReplicatedLog().setSnapshotIndex(100);
229 List<ReplicatedLogEntry> entries = Arrays.asList(
230 newReplicatedLogEntry(2, 101, "foo"));
232 // The new commitIndex is 101
233 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
235 follower = createBehavior(context);
236 follower.handleMessage(leaderActor, appendEntries);
238 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
239 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
241 assertFalse(syncStatus.isInitialSyncDone());
242 assertTrue("append entries reply should be true", reply.isSuccess());
246 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
247 logStart("testHandleFirstAppendEntries");
249 MockRaftActorContext context = createActorContext();
250 context.getReplicatedLog().clear(0,2);
251 context.getReplicatedLog().setSnapshotIndex(100);
253 List<ReplicatedLogEntry> entries = Arrays.asList(
254 newReplicatedLogEntry(2, 105, "foo"));
256 // The new commitIndex is 101
257 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
259 follower = createBehavior(context);
260 follower.handleMessage(leaderActor, appendEntries);
262 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
263 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
265 assertFalse(syncStatus.isInitialSyncDone());
266 assertFalse("append entries reply should be false", reply.isSuccess());
270 public void testHandleSyncUpAppendEntries() throws Exception {
271 logStart("testHandleSyncUpAppendEntries");
273 MockRaftActorContext context = createActorContext();
275 List<ReplicatedLogEntry> entries = Arrays.asList(
276 newReplicatedLogEntry(2, 101, "foo"));
278 // The new commitIndex is 101
279 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
281 follower = createBehavior(context);
282 follower.handleMessage(leaderActor, appendEntries);
284 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
286 assertFalse(syncStatus.isInitialSyncDone());
288 // Clear all the messages
289 followerActor.underlyingActor().clear();
291 context.setLastApplied(101);
292 context.setCommitIndex(101);
293 setLastLogEntry(context, 1, 101,
294 new MockRaftActorContext.MockPayload(""));
296 entries = Arrays.asList(
297 newReplicatedLogEntry(2, 101, "foo"));
299 // The new commitIndex is 101
300 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
301 follower.handleMessage(leaderActor, appendEntries);
303 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
305 assertTrue(syncStatus.isInitialSyncDone());
307 followerActor.underlyingActor().clear();
309 // Sending the same message again should not generate another message
311 follower.handleMessage(leaderActor, appendEntries);
313 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
315 assertNull(syncStatus);
320 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
321 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
323 MockRaftActorContext context = createActorContext();
325 List<ReplicatedLogEntry> entries = Arrays.asList(
326 newReplicatedLogEntry(2, 101, "foo"));
328 // The new commitIndex is 101
329 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
331 follower = createBehavior(context);
332 follower.handleMessage(leaderActor, appendEntries);
334 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
336 assertFalse(syncStatus.isInitialSyncDone());
338 // Clear all the messages
339 followerActor.underlyingActor().clear();
341 context.setLastApplied(100);
342 setLastLogEntry(context, 1, 100,
343 new MockRaftActorContext.MockPayload(""));
345 entries = Arrays.asList(
346 newReplicatedLogEntry(2, 101, "foo"));
348 // leader-2 is becoming the leader now and it says the commitIndex is 45
349 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
350 follower.handleMessage(leaderActor, appendEntries);
352 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
354 // We get a new message saying initial status is not done
355 assertFalse(syncStatus.isInitialSyncDone());
361 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
362 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
364 MockRaftActorContext context = createActorContext();
366 List<ReplicatedLogEntry> entries = Arrays.asList(
367 newReplicatedLogEntry(2, 101, "foo"));
369 // The new commitIndex is 101
370 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
372 follower = createBehavior(context);
373 follower.handleMessage(leaderActor, appendEntries);
375 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
377 assertFalse(syncStatus.isInitialSyncDone());
379 // Clear all the messages
380 followerActor.underlyingActor().clear();
382 context.setLastApplied(101);
383 context.setCommitIndex(101);
384 setLastLogEntry(context, 1, 101,
385 new MockRaftActorContext.MockPayload(""));
387 entries = Arrays.asList(
388 newReplicatedLogEntry(2, 101, "foo"));
390 // The new commitIndex is 101
391 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
392 follower.handleMessage(leaderActor, appendEntries);
394 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
396 assertTrue(syncStatus.isInitialSyncDone());
398 // Clear all the messages
399 followerActor.underlyingActor().clear();
401 context.setLastApplied(100);
402 setLastLogEntry(context, 1, 100,
403 new MockRaftActorContext.MockPayload(""));
405 entries = Arrays.asList(
406 newReplicatedLogEntry(2, 101, "foo"));
408 // leader-2 is becoming the leader now and it says the commitIndex is 45
409 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
410 follower.handleMessage(leaderActor, appendEntries);
412 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
414 // We get a new message saying initial status is not done
415 assertFalse(syncStatus.isInitialSyncDone());
421 * This test verifies that when an AppendEntries RPC is received by a RaftActor
422 * with a commitIndex that is greater than what has been applied to the
423 * state machine of the RaftActor, the RaftActor applies the state and
424 * sets it current applied state to the commitIndex of the sender.
429 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
430 logStart("testHandleAppendEntriesWithNewerCommitIndex");
432 MockRaftActorContext context = createActorContext();
434 context.setLastApplied(100);
435 setLastLogEntry(context, 1, 100,
436 new MockRaftActorContext.MockPayload(""));
437 context.getReplicatedLog().setSnapshotIndex(99);
439 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
440 newReplicatedLogEntry(2, 101, "foo"));
442 // The new commitIndex is 101
443 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
445 follower = createBehavior(context);
446 follower.handleMessage(leaderActor, appendEntries);
448 assertEquals("getLastApplied", 101L, context.getLastApplied());
452 * This test verifies that when an AppendEntries is received a specific prevLogTerm
453 * which does not match the term that is in RaftActors log entry at prevLogIndex
454 * then the RaftActor does not change it's state and it returns a failure.
459 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
460 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
462 MockRaftActorContext context = createActorContext();
464 // First set the receivers term to lower number
465 context.getTermInformation().update(95, "test");
467 // AppendEntries is now sent with a bigger term
468 // this will set the receivers term to be the same as the sender's term
469 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
471 follower = createBehavior(context);
473 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
475 Assert.assertSame(follower, newBehavior);
477 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
478 AppendEntriesReply.class);
480 assertEquals("isSuccess", false, reply.isSuccess());
484 * This test verifies that when a new AppendEntries message is received with
485 * new entries and the logs of the sender and receiver match that the new
486 * entries get added to the log and the log is incremented by the number of
487 * entries received in appendEntries
492 public void testHandleAppendEntriesAddNewEntries() {
493 logStart("testHandleAppendEntriesAddNewEntries");
495 MockRaftActorContext context = createActorContext();
497 // First set the receivers term to lower number
498 context.getTermInformation().update(1, "test");
500 // Prepare the receivers log
501 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
502 log.append(newReplicatedLogEntry(1, 0, "zero"));
503 log.append(newReplicatedLogEntry(1, 1, "one"));
504 log.append(newReplicatedLogEntry(1, 2, "two"));
506 context.setReplicatedLog(log);
508 // Prepare the entries to be sent with AppendEntries
509 List<ReplicatedLogEntry> entries = new ArrayList<>();
510 entries.add(newReplicatedLogEntry(1, 3, "three"));
511 entries.add(newReplicatedLogEntry(1, 4, "four"));
513 // Send appendEntries with the same term as was set on the receiver
514 // before the new behavior was created (1 in this case)
515 // This will not work for a Candidate because as soon as a Candidate
516 // is created it increments the term
517 short leaderPayloadVersion = 10;
518 String leaderId = "leader-1";
519 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
521 follower = createBehavior(context);
523 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
525 Assert.assertSame(follower, newBehavior);
527 assertEquals("Next index", 5, log.last().getIndex() + 1);
528 assertEquals("Entry 3", entries.get(0), log.get(3));
529 assertEquals("Entry 4", entries.get(1), log.get(4));
531 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
532 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
534 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
538 * This test verifies that when a new AppendEntries message is received with
539 * new entries and the logs of the sender and receiver are out-of-sync that
540 * the log is first corrected by removing the out of sync entries from the
541 * log and then adding in the new entries sent with the AppendEntries message
544 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
545 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
547 MockRaftActorContext context = createActorContext();
549 // First set the receivers term to lower number
550 context.getTermInformation().update(1, "test");
552 // Prepare the receivers log
553 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
554 log.append(newReplicatedLogEntry(1, 0, "zero"));
555 log.append(newReplicatedLogEntry(1, 1, "one"));
556 log.append(newReplicatedLogEntry(1, 2, "two"));
558 context.setReplicatedLog(log);
560 // Prepare the entries to be sent with AppendEntries
561 List<ReplicatedLogEntry> entries = new ArrayList<>();
562 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
563 entries.add(newReplicatedLogEntry(2, 3, "three"));
565 // Send appendEntries with the same term as was set on the receiver
566 // before the new behavior was created (1 in this case)
567 // This will not work for a Candidate because as soon as a Candidate
568 // is created it increments the term
569 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
571 follower = createBehavior(context);
573 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
575 Assert.assertSame(follower, newBehavior);
577 // The entry at index 2 will be found out-of-sync with the leader
578 // and will be removed
579 // Then the two new entries will be added to the log
580 // Thus making the log to have 4 entries
581 assertEquals("Next index", 4, log.last().getIndex() + 1);
582 //assertEquals("Entry 2", entries.get(0), log.get(2));
584 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
586 // Check that the entry at index 2 has the new data
587 assertEquals("Entry 2", entries.get(0), log.get(2));
589 assertEquals("Entry 3", entries.get(1), log.get(3));
591 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
595 public void testHandleAppendEntriesPreviousLogEntryMissing(){
596 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
598 MockRaftActorContext context = createActorContext();
600 // Prepare the receivers log
601 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
602 log.append(newReplicatedLogEntry(1, 0, "zero"));
603 log.append(newReplicatedLogEntry(1, 1, "one"));
604 log.append(newReplicatedLogEntry(1, 2, "two"));
606 context.setReplicatedLog(log);
608 // Prepare the entries to be sent with AppendEntries
609 List<ReplicatedLogEntry> entries = new ArrayList<>();
610 entries.add(newReplicatedLogEntry(1, 4, "four"));
612 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
614 follower = createBehavior(context);
616 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
618 Assert.assertSame(follower, newBehavior);
620 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
624 public void testHandleAppendEntriesWithExistingLogEntry() {
625 logStart("testHandleAppendEntriesWithExistingLogEntry");
627 MockRaftActorContext context = createActorContext();
629 context.getTermInformation().update(1, "test");
631 // Prepare the receivers log
632 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
633 log.append(newReplicatedLogEntry(1, 0, "zero"));
634 log.append(newReplicatedLogEntry(1, 1, "one"));
636 context.setReplicatedLog(log);
638 // Send the last entry again.
639 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
641 follower = createBehavior(context);
643 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
645 assertEquals("Next index", 2, log.last().getIndex() + 1);
646 assertEquals("Entry 1", entries.get(0), log.get(1));
648 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
650 // Send the last entry again and also a new one.
652 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
654 leaderActor.underlyingActor().clear();
655 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
657 assertEquals("Next index", 3, log.last().getIndex() + 1);
658 assertEquals("Entry 1", entries.get(0), log.get(1));
659 assertEquals("Entry 2", entries.get(1), log.get(2));
661 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
665 public void testHandleAppendEntriesAfterInstallingSnapshot(){
666 logStart("testHandleAppendAfterInstallingSnapshot");
668 MockRaftActorContext context = createActorContext();
670 // Prepare the receivers log
671 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
673 // Set up a log as if it has been snapshotted
674 log.setSnapshotIndex(3);
675 log.setSnapshotTerm(1);
677 context.setReplicatedLog(log);
679 // Prepare the entries to be sent with AppendEntries
680 List<ReplicatedLogEntry> entries = new ArrayList<>();
681 entries.add(newReplicatedLogEntry(1, 4, "four"));
683 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
685 follower = createBehavior(context);
687 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
689 Assert.assertSame(follower, newBehavior);
691 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
696 * This test verifies that when InstallSnapshot is received by
697 * the follower its applied correctly.
702 public void testHandleInstallSnapshot() throws Exception {
703 logStart("testHandleInstallSnapshot");
705 MockRaftActorContext context = createActorContext();
707 follower = createBehavior(context);
709 ByteString bsSnapshot = createSnapshot();
711 int snapshotLength = bsSnapshot.size();
713 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
714 int lastIncludedIndex = 1;
716 InstallSnapshot lastInstallSnapshot = null;
718 for(int i = 0; i < totalChunks; i++) {
719 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
720 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
721 chunkData, chunkIndex, totalChunks);
722 follower.handleMessage(leaderActor, lastInstallSnapshot);
723 offset = offset + 50;
728 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
729 ApplySnapshot.class);
730 Snapshot snapshot = applySnapshot.getSnapshot();
731 assertNotNull(lastInstallSnapshot);
732 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
733 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
734 snapshot.getLastAppliedTerm());
735 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
736 snapshot.getLastAppliedIndex());
737 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
738 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
740 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
741 leaderActor, InstallSnapshotReply.class);
742 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
745 for(InstallSnapshotReply reply: replies) {
746 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
747 assertEquals("getTerm", 1, reply.getTerm());
748 assertEquals("isSuccess", true, reply.isSuccess());
749 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
752 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
757 * Verify that when an AppendEntries is sent to a follower during a snapshot install
758 * the Follower short-circuits the processing of the AppendEntries message.
763 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
764 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
766 MockRaftActorContext context = createActorContext();
768 follower = createBehavior(context);
770 ByteString bsSnapshot = createSnapshot();
771 int snapshotLength = bsSnapshot.size();
773 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
774 int lastIncludedIndex = 1;
776 // Check that snapshot installation is not in progress
777 assertNull(((Follower) follower).getSnapshotTracker());
779 // Make sure that we have more than 1 chunk to send
780 assertTrue(totalChunks > 1);
782 // Send an install snapshot with the first chunk to start the process of installing a snapshot
783 ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
784 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
785 chunkData, 1, totalChunks));
787 // Check if snapshot installation is in progress now
788 assertNotNull(((Follower) follower).getSnapshotTracker());
790 // Send an append entry
791 AppendEntries appendEntries = mock(AppendEntries.class);
792 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
794 follower.handleMessage(leaderActor, appendEntries);
796 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
797 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
798 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
799 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
801 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
802 verify(appendEntries, never()).getPrevLogIndex();
807 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
808 logStart("testInitialSyncUpWithHandleInstallSnapshot");
810 MockRaftActorContext context = createActorContext();
812 follower = createBehavior(context);
814 ByteString bsSnapshot = createSnapshot();
816 int snapshotLength = bsSnapshot.size();
818 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
819 int lastIncludedIndex = 1;
821 InstallSnapshot lastInstallSnapshot = null;
823 for(int i = 0; i < totalChunks; i++) {
824 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
825 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
826 chunkData, chunkIndex, totalChunks);
827 follower.handleMessage(leaderActor, lastInstallSnapshot);
828 offset = offset + 50;
833 FollowerInitialSyncUpStatus syncStatus =
834 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
836 assertFalse(syncStatus.isInitialSyncDone());
838 // Clear all the messages
839 followerActor.underlyingActor().clear();
841 context.setLastApplied(101);
842 context.setCommitIndex(101);
843 setLastLogEntry(context, 1, 101,
844 new MockRaftActorContext.MockPayload(""));
846 List<ReplicatedLogEntry> entries = Arrays.asList(
847 newReplicatedLogEntry(2, 101, "foo"));
849 // The new commitIndex is 101
850 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
851 follower.handleMessage(leaderActor, appendEntries);
853 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
855 assertTrue(syncStatus.isInitialSyncDone());
859 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
860 logStart("testHandleOutOfSequenceInstallSnapshot");
862 MockRaftActorContext context = createActorContext();
864 follower = createBehavior(context);
866 ByteString bsSnapshot = createSnapshot();
868 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
869 getNextChunk(bsSnapshot, 10, 50), 3, 3);
870 follower.handleMessage(leaderActor, installSnapshot);
872 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
873 InstallSnapshotReply.class);
875 assertEquals("isSuccess", false, reply.isSuccess());
876 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
877 assertEquals("getTerm", 1, reply.getTerm());
878 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
880 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
884 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
885 MockRaftActorContext context = createActorContext();
887 Stopwatch stopwatch = Stopwatch.createStarted();
889 follower = createBehavior(context);
891 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
893 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
895 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
899 public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
900 MockRaftActorContext context = createActorContext();
901 context.setConfigParams(new DefaultConfigParamsImpl(){
903 public FiniteDuration getElectionTimeOutInterval() {
904 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
908 context.setRaftPolicy(createRaftPolicy(false, false));
910 follower = createBehavior(context);
912 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
916 public void testElectionScheduledWhenAnyRaftRPCReceived(){
917 MockRaftActorContext context = createActorContext();
918 follower = createBehavior(context);
919 follower.handleMessage(leaderActor, new RaftRPC() {
921 public long getTerm() {
925 assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
929 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
930 MockRaftActorContext context = createActorContext();
931 follower = createBehavior(context);
932 follower.handleMessage(leaderActor, "non-raft-rpc");
933 assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
936 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
937 int snapshotLength = bs.size();
939 int size = chunkSize;
940 if (chunkSize > snapshotLength) {
941 size = snapshotLength;
943 if ((start + chunkSize) > snapshotLength) {
944 size = snapshotLength - start;
947 return bs.substring(start, start + size);
950 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
951 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
953 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
954 AppendEntriesReply.class);
956 assertEquals("isSuccess", expSuccess, reply.isSuccess());
957 assertEquals("getTerm", expTerm, reply.getTerm());
958 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
959 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
960 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
961 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
964 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
965 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
966 new MockRaftActorContext.MockPayload(data));
969 private ByteString createSnapshot(){
970 HashMap<String, String> followerSnapshot = new HashMap<>();
971 followerSnapshot.put("1", "A");
972 followerSnapshot.put("2", "B");
973 followerSnapshot.put("3", "C");
975 return toByteString(followerSnapshot);
979 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
980 ActorRef actorRef, RaftRPC rpc) throws Exception {
981 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
983 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
984 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
988 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
990 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
991 assertEquals("isSuccess", true, reply.isSuccess());
994 private static class TestFollower extends Follower {
996 int electionTimeoutCount = 0;
998 public TestFollower(RaftActorContext context) {
1003 protected void scheduleElection(FiniteDuration interval) {
1004 electionTimeoutCount++;
1005 super.scheduleElection(interval);
1008 public int getElectionTimeoutCount() {
1009 return electionTimeoutCount;