2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.Props;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Stopwatch;
24 import com.google.protobuf.ByteString;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
34 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftActorContext;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.Snapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
40 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import scala.concurrent.duration.FiniteDuration;
51 public class FollowerTest extends AbstractRaftActorBehaviorTest {
53 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
54 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
56 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
57 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
59 private RaftActorBehavior follower;
61 private final short payloadVersion = 5;
65 public void tearDown() throws Exception {
66 if(follower != null) {
74 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
75 return new Follower(actorContext);
79 protected MockRaftActorContext createActorContext() {
80 return createActorContext(followerActor);
84 protected MockRaftActorContext createActorContext(ActorRef actorRef){
85 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
86 context.setPayloadVersion(payloadVersion );
91 public void testThatAnElectionTimeoutIsTriggered(){
92 MockRaftActorContext actorContext = createActorContext();
93 follower = new Follower(actorContext);
95 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
96 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
100 public void testHandleElectionTimeout(){
101 logStart("testHandleElectionTimeout");
103 follower = new Follower(createActorContext());
105 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
107 assertTrue(raftBehavior instanceof Candidate);
111 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
112 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
114 RaftActorContext context = createActorContext();
116 context.getTermInformation().update(term, null);
118 follower = createBehavior(context);
120 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
122 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
124 assertEquals("isVoteGranted", true, reply.isVoteGranted());
125 assertEquals("getTerm", term, reply.getTerm());
129 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
130 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
132 RaftActorContext context = createActorContext();
134 context.getTermInformation().update(term, "test");
136 follower = createBehavior(context);
138 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
140 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
142 assertEquals("isVoteGranted", false, reply.isVoteGranted());
147 public void testHandleFirstAppendEntries() throws Exception {
148 logStart("testHandleFirstAppendEntries");
150 MockRaftActorContext context = createActorContext();
151 context.getReplicatedLog().clear(0,2);
152 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
153 context.getReplicatedLog().setSnapshotIndex(99);
155 List<ReplicatedLogEntry> entries = Arrays.asList(
156 newReplicatedLogEntry(2, 101, "foo"));
158 Assert.assertEquals(1, context.getReplicatedLog().size());
160 // The new commitIndex is 101
161 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
163 follower = createBehavior(context);
164 follower.handleMessage(leaderActor, appendEntries);
166 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
167 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
169 assertFalse(syncStatus.isInitialSyncDone());
170 assertTrue("append entries reply should be true", reply.isSuccess());
174 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
175 logStart("testHandleFirstAppendEntries");
177 MockRaftActorContext context = createActorContext();
179 List<ReplicatedLogEntry> entries = Arrays.asList(
180 newReplicatedLogEntry(2, 101, "foo"));
182 // The new commitIndex is 101
183 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
185 follower = createBehavior(context);
186 follower.handleMessage(leaderActor, appendEntries);
188 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
189 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
191 assertFalse(syncStatus.isInitialSyncDone());
192 assertFalse("append entries reply should be false", reply.isSuccess());
196 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
197 logStart("testHandleFirstAppendEntries");
199 MockRaftActorContext context = createActorContext();
200 context.getReplicatedLog().clear(0,2);
201 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
202 context.getReplicatedLog().setSnapshotIndex(99);
204 List<ReplicatedLogEntry> entries = Arrays.asList(
205 newReplicatedLogEntry(2, 101, "foo"));
207 // The new commitIndex is 101
208 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
210 follower = createBehavior(context);
211 follower.handleMessage(leaderActor, appendEntries);
213 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
214 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
216 assertFalse(syncStatus.isInitialSyncDone());
217 assertTrue("append entries reply should be true", reply.isSuccess());
221 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
222 logStart("testHandleFirstAppendEntries");
224 MockRaftActorContext context = createActorContext();
225 context.getReplicatedLog().clear(0,2);
226 context.getReplicatedLog().setSnapshotIndex(100);
228 List<ReplicatedLogEntry> entries = Arrays.asList(
229 newReplicatedLogEntry(2, 101, "foo"));
231 // The new commitIndex is 101
232 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
234 follower = createBehavior(context);
235 follower.handleMessage(leaderActor, appendEntries);
237 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
238 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
240 assertFalse(syncStatus.isInitialSyncDone());
241 assertTrue("append entries reply should be true", reply.isSuccess());
245 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
246 logStart("testHandleFirstAppendEntries");
248 MockRaftActorContext context = createActorContext();
249 context.getReplicatedLog().clear(0,2);
250 context.getReplicatedLog().setSnapshotIndex(100);
252 List<ReplicatedLogEntry> entries = Arrays.asList(
253 newReplicatedLogEntry(2, 105, "foo"));
255 // The new commitIndex is 101
256 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
258 follower = createBehavior(context);
259 follower.handleMessage(leaderActor, appendEntries);
261 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
262 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
264 assertFalse(syncStatus.isInitialSyncDone());
265 assertFalse("append entries reply should be false", reply.isSuccess());
269 public void testHandleSyncUpAppendEntries() throws Exception {
270 logStart("testHandleSyncUpAppendEntries");
272 MockRaftActorContext context = createActorContext();
274 List<ReplicatedLogEntry> entries = Arrays.asList(
275 newReplicatedLogEntry(2, 101, "foo"));
277 // The new commitIndex is 101
278 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
280 follower = createBehavior(context);
281 follower.handleMessage(leaderActor, appendEntries);
283 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
285 assertFalse(syncStatus.isInitialSyncDone());
287 // Clear all the messages
288 followerActor.underlyingActor().clear();
290 context.setLastApplied(101);
291 context.setCommitIndex(101);
292 setLastLogEntry(context, 1, 101,
293 new MockRaftActorContext.MockPayload(""));
295 entries = Arrays.asList(
296 newReplicatedLogEntry(2, 101, "foo"));
298 // The new commitIndex is 101
299 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
300 follower.handleMessage(leaderActor, appendEntries);
302 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
304 assertTrue(syncStatus.isInitialSyncDone());
306 followerActor.underlyingActor().clear();
308 // Sending the same message again should not generate another message
310 follower.handleMessage(leaderActor, appendEntries);
312 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
314 assertNull(syncStatus);
319 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
320 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
322 MockRaftActorContext context = createActorContext();
324 List<ReplicatedLogEntry> entries = Arrays.asList(
325 newReplicatedLogEntry(2, 101, "foo"));
327 // The new commitIndex is 101
328 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
330 follower = createBehavior(context);
331 follower.handleMessage(leaderActor, appendEntries);
333 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
335 assertFalse(syncStatus.isInitialSyncDone());
337 // Clear all the messages
338 followerActor.underlyingActor().clear();
340 context.setLastApplied(100);
341 setLastLogEntry(context, 1, 100,
342 new MockRaftActorContext.MockPayload(""));
344 entries = Arrays.asList(
345 newReplicatedLogEntry(2, 101, "foo"));
347 // leader-2 is becoming the leader now and it says the commitIndex is 45
348 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
349 follower.handleMessage(leaderActor, appendEntries);
351 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
353 // We get a new message saying initial status is not done
354 assertFalse(syncStatus.isInitialSyncDone());
360 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
361 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
363 MockRaftActorContext context = createActorContext();
365 List<ReplicatedLogEntry> entries = Arrays.asList(
366 newReplicatedLogEntry(2, 101, "foo"));
368 // The new commitIndex is 101
369 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
371 follower = createBehavior(context);
372 follower.handleMessage(leaderActor, appendEntries);
374 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
376 assertFalse(syncStatus.isInitialSyncDone());
378 // Clear all the messages
379 followerActor.underlyingActor().clear();
381 context.setLastApplied(101);
382 context.setCommitIndex(101);
383 setLastLogEntry(context, 1, 101,
384 new MockRaftActorContext.MockPayload(""));
386 entries = Arrays.asList(
387 newReplicatedLogEntry(2, 101, "foo"));
389 // The new commitIndex is 101
390 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
391 follower.handleMessage(leaderActor, appendEntries);
393 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
395 assertTrue(syncStatus.isInitialSyncDone());
397 // Clear all the messages
398 followerActor.underlyingActor().clear();
400 context.setLastApplied(100);
401 setLastLogEntry(context, 1, 100,
402 new MockRaftActorContext.MockPayload(""));
404 entries = Arrays.asList(
405 newReplicatedLogEntry(2, 101, "foo"));
407 // leader-2 is becoming the leader now and it says the commitIndex is 45
408 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
409 follower.handleMessage(leaderActor, appendEntries);
411 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
413 // We get a new message saying initial status is not done
414 assertFalse(syncStatus.isInitialSyncDone());
420 * This test verifies that when an AppendEntries RPC is received by a RaftActor
421 * with a commitIndex that is greater than what has been applied to the
422 * state machine of the RaftActor, the RaftActor applies the state and
423 * sets it current applied state to the commitIndex of the sender.
428 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
429 logStart("testHandleAppendEntriesWithNewerCommitIndex");
431 MockRaftActorContext context = createActorContext();
433 context.setLastApplied(100);
434 setLastLogEntry(context, 1, 100,
435 new MockRaftActorContext.MockPayload(""));
436 context.getReplicatedLog().setSnapshotIndex(99);
438 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
439 newReplicatedLogEntry(2, 101, "foo"));
441 // The new commitIndex is 101
442 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
444 follower = createBehavior(context);
445 follower.handleMessage(leaderActor, appendEntries);
447 assertEquals("getLastApplied", 101L, context.getLastApplied());
451 * This test verifies that when an AppendEntries is received a specific prevLogTerm
452 * which does not match the term that is in RaftActors log entry at prevLogIndex
453 * then the RaftActor does not change it's state and it returns a failure.
458 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
459 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
461 MockRaftActorContext context = createActorContext();
463 // First set the receivers term to lower number
464 context.getTermInformation().update(95, "test");
466 // AppendEntries is now sent with a bigger term
467 // this will set the receivers term to be the same as the sender's term
468 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
470 follower = createBehavior(context);
472 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
474 Assert.assertSame(follower, newBehavior);
476 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
477 AppendEntriesReply.class);
479 assertEquals("isSuccess", false, reply.isSuccess());
483 * This test verifies that when a new AppendEntries message is received with
484 * new entries and the logs of the sender and receiver match that the new
485 * entries get added to the log and the log is incremented by the number of
486 * entries received in appendEntries
491 public void testHandleAppendEntriesAddNewEntries() {
492 logStart("testHandleAppendEntriesAddNewEntries");
494 MockRaftActorContext context = createActorContext();
496 // First set the receivers term to lower number
497 context.getTermInformation().update(1, "test");
499 // Prepare the receivers log
500 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
501 log.append(newReplicatedLogEntry(1, 0, "zero"));
502 log.append(newReplicatedLogEntry(1, 1, "one"));
503 log.append(newReplicatedLogEntry(1, 2, "two"));
505 context.setReplicatedLog(log);
507 // Prepare the entries to be sent with AppendEntries
508 List<ReplicatedLogEntry> entries = new ArrayList<>();
509 entries.add(newReplicatedLogEntry(1, 3, "three"));
510 entries.add(newReplicatedLogEntry(1, 4, "four"));
512 // Send appendEntries with the same term as was set on the receiver
513 // before the new behavior was created (1 in this case)
514 // This will not work for a Candidate because as soon as a Candidate
515 // is created it increments the term
516 short leaderPayloadVersion = 10;
517 String leaderId = "leader-1";
518 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
520 follower = createBehavior(context);
522 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
524 Assert.assertSame(follower, newBehavior);
526 assertEquals("Next index", 5, log.last().getIndex() + 1);
527 assertEquals("Entry 3", entries.get(0), log.get(3));
528 assertEquals("Entry 4", entries.get(1), log.get(4));
530 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
531 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
533 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
537 * This test verifies that when a new AppendEntries message is received with
538 * new entries and the logs of the sender and receiver are out-of-sync that
539 * the log is first corrected by removing the out of sync entries from the
540 * log and then adding in the new entries sent with the AppendEntries message
543 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
544 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
546 MockRaftActorContext context = createActorContext();
548 // First set the receivers term to lower number
549 context.getTermInformation().update(1, "test");
551 // Prepare the receivers log
552 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
553 log.append(newReplicatedLogEntry(1, 0, "zero"));
554 log.append(newReplicatedLogEntry(1, 1, "one"));
555 log.append(newReplicatedLogEntry(1, 2, "two"));
557 context.setReplicatedLog(log);
559 // Prepare the entries to be sent with AppendEntries
560 List<ReplicatedLogEntry> entries = new ArrayList<>();
561 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
562 entries.add(newReplicatedLogEntry(2, 3, "three"));
564 // Send appendEntries with the same term as was set on the receiver
565 // before the new behavior was created (1 in this case)
566 // This will not work for a Candidate because as soon as a Candidate
567 // is created it increments the term
568 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
570 follower = createBehavior(context);
572 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
574 Assert.assertSame(follower, newBehavior);
576 // The entry at index 2 will be found out-of-sync with the leader
577 // and will be removed
578 // Then the two new entries will be added to the log
579 // Thus making the log to have 4 entries
580 assertEquals("Next index", 4, log.last().getIndex() + 1);
581 //assertEquals("Entry 2", entries.get(0), log.get(2));
583 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
585 // Check that the entry at index 2 has the new data
586 assertEquals("Entry 2", entries.get(0), log.get(2));
588 assertEquals("Entry 3", entries.get(1), log.get(3));
590 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
594 public void testHandleAppendEntriesPreviousLogEntryMissing(){
595 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
597 MockRaftActorContext context = createActorContext();
599 // Prepare the receivers log
600 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
601 log.append(newReplicatedLogEntry(1, 0, "zero"));
602 log.append(newReplicatedLogEntry(1, 1, "one"));
603 log.append(newReplicatedLogEntry(1, 2, "two"));
605 context.setReplicatedLog(log);
607 // Prepare the entries to be sent with AppendEntries
608 List<ReplicatedLogEntry> entries = new ArrayList<>();
609 entries.add(newReplicatedLogEntry(1, 4, "four"));
611 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
613 follower = createBehavior(context);
615 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
617 Assert.assertSame(follower, newBehavior);
619 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
623 public void testHandleAppendEntriesWithExistingLogEntry() {
624 logStart("testHandleAppendEntriesWithExistingLogEntry");
626 MockRaftActorContext context = createActorContext();
628 context.getTermInformation().update(1, "test");
630 // Prepare the receivers log
631 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
632 log.append(newReplicatedLogEntry(1, 0, "zero"));
633 log.append(newReplicatedLogEntry(1, 1, "one"));
635 context.setReplicatedLog(log);
637 // Send the last entry again.
638 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
640 follower = createBehavior(context);
642 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
644 assertEquals("Next index", 2, log.last().getIndex() + 1);
645 assertEquals("Entry 1", entries.get(0), log.get(1));
647 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
649 // Send the last entry again and also a new one.
651 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
653 leaderActor.underlyingActor().clear();
654 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
656 assertEquals("Next index", 3, log.last().getIndex() + 1);
657 assertEquals("Entry 1", entries.get(0), log.get(1));
658 assertEquals("Entry 2", entries.get(1), log.get(2));
660 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
664 public void testHandleAppendEntriesAfterInstallingSnapshot(){
665 logStart("testHandleAppendAfterInstallingSnapshot");
667 MockRaftActorContext context = createActorContext();
669 // Prepare the receivers log
670 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
672 // Set up a log as if it has been snapshotted
673 log.setSnapshotIndex(3);
674 log.setSnapshotTerm(1);
676 context.setReplicatedLog(log);
678 // Prepare the entries to be sent with AppendEntries
679 List<ReplicatedLogEntry> entries = new ArrayList<>();
680 entries.add(newReplicatedLogEntry(1, 4, "four"));
682 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
684 follower = createBehavior(context);
686 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
688 Assert.assertSame(follower, newBehavior);
690 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
695 * This test verifies that when InstallSnapshot is received by
696 * the follower its applied correctly.
701 public void testHandleInstallSnapshot() throws Exception {
702 logStart("testHandleInstallSnapshot");
704 MockRaftActorContext context = createActorContext();
706 follower = createBehavior(context);
708 ByteString bsSnapshot = createSnapshot();
710 int snapshotLength = bsSnapshot.size();
712 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
713 int lastIncludedIndex = 1;
715 InstallSnapshot lastInstallSnapshot = null;
717 for(int i = 0; i < totalChunks; i++) {
718 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
719 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
720 chunkData, chunkIndex, totalChunks);
721 follower.handleMessage(leaderActor, lastInstallSnapshot);
722 offset = offset + 50;
727 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
728 ApplySnapshot.class);
729 Snapshot snapshot = applySnapshot.getSnapshot();
730 assertNotNull(lastInstallSnapshot);
731 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
732 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
733 snapshot.getLastAppliedTerm());
734 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
735 snapshot.getLastAppliedIndex());
736 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
737 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
739 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
740 leaderActor, InstallSnapshotReply.class);
741 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
744 for(InstallSnapshotReply reply: replies) {
745 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
746 assertEquals("getTerm", 1, reply.getTerm());
747 assertEquals("isSuccess", true, reply.isSuccess());
748 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
751 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
756 * Verify that when an AppendEntries is sent to a follower during a snapshot install
757 * the Follower short-circuits the processing of the AppendEntries message.
762 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
763 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
765 MockRaftActorContext context = createActorContext();
767 follower = createBehavior(context);
769 ByteString bsSnapshot = createSnapshot();
770 int snapshotLength = bsSnapshot.size();
772 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
773 int lastIncludedIndex = 1;
775 // Check that snapshot installation is not in progress
776 assertNull(((Follower) follower).getSnapshotTracker());
778 // Make sure that we have more than 1 chunk to send
779 assertTrue(totalChunks > 1);
781 // Send an install snapshot with the first chunk to start the process of installing a snapshot
782 ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
783 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
784 chunkData, 1, totalChunks));
786 // Check if snapshot installation is in progress now
787 assertNotNull(((Follower) follower).getSnapshotTracker());
789 // Send an append entry
790 AppendEntries appendEntries = mock(AppendEntries.class);
791 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
793 follower.handleMessage(leaderActor, appendEntries);
795 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
796 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
797 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
798 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
800 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
801 verify(appendEntries, never()).getPrevLogIndex();
806 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
807 logStart("testInitialSyncUpWithHandleInstallSnapshot");
809 MockRaftActorContext context = createActorContext();
811 follower = createBehavior(context);
813 ByteString bsSnapshot = createSnapshot();
815 int snapshotLength = bsSnapshot.size();
817 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
818 int lastIncludedIndex = 1;
820 InstallSnapshot lastInstallSnapshot = null;
822 for(int i = 0; i < totalChunks; i++) {
823 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
824 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
825 chunkData, chunkIndex, totalChunks);
826 follower.handleMessage(leaderActor, lastInstallSnapshot);
827 offset = offset + 50;
832 FollowerInitialSyncUpStatus syncStatus =
833 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
835 assertFalse(syncStatus.isInitialSyncDone());
837 // Clear all the messages
838 followerActor.underlyingActor().clear();
840 context.setLastApplied(101);
841 context.setCommitIndex(101);
842 setLastLogEntry(context, 1, 101,
843 new MockRaftActorContext.MockPayload(""));
845 List<ReplicatedLogEntry> entries = Arrays.asList(
846 newReplicatedLogEntry(2, 101, "foo"));
848 // The new commitIndex is 101
849 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
850 follower.handleMessage(leaderActor, appendEntries);
852 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
854 assertTrue(syncStatus.isInitialSyncDone());
858 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
859 logStart("testHandleOutOfSequenceInstallSnapshot");
861 MockRaftActorContext context = createActorContext();
863 follower = createBehavior(context);
865 ByteString bsSnapshot = createSnapshot();
867 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
868 getNextChunk(bsSnapshot, 10, 50), 3, 3);
869 follower.handleMessage(leaderActor, installSnapshot);
871 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
872 InstallSnapshotReply.class);
874 assertEquals("isSuccess", false, reply.isSuccess());
875 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
876 assertEquals("getTerm", 1, reply.getTerm());
877 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
879 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
883 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
884 MockRaftActorContext context = createActorContext();
886 Stopwatch stopwatch = Stopwatch.createStarted();
888 follower = createBehavior(context);
890 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
892 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
894 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
898 public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
899 MockRaftActorContext context = createActorContext();
900 context.setConfigParams(new DefaultConfigParamsImpl(){
902 public FiniteDuration getElectionTimeOutInterval() {
903 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
907 context.setRaftPolicy(createRaftPolicy(false, false));
909 follower = createBehavior(context);
911 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
915 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
916 int snapshotLength = bs.size();
918 int size = chunkSize;
919 if (chunkSize > snapshotLength) {
920 size = snapshotLength;
922 if ((start + chunkSize) > snapshotLength) {
923 size = snapshotLength - start;
926 return bs.substring(start, start + size);
929 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
930 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
932 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
933 AppendEntriesReply.class);
935 assertEquals("isSuccess", expSuccess, reply.isSuccess());
936 assertEquals("getTerm", expTerm, reply.getTerm());
937 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
938 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
939 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
940 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
943 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
944 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
945 new MockRaftActorContext.MockPayload(data));
948 private ByteString createSnapshot(){
949 HashMap<String, String> followerSnapshot = new HashMap<>();
950 followerSnapshot.put("1", "A");
951 followerSnapshot.put("2", "B");
952 followerSnapshot.put("3", "C");
954 return toByteString(followerSnapshot);
958 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
959 ActorRef actorRef, RaftRPC rpc) throws Exception {
960 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
962 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
963 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
967 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
969 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
970 assertEquals("isSuccess", true, reply.isSuccess());