2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.Props;
24 import akka.testkit.TestActorRef;
25 import com.google.common.base.Stopwatch;
26 import com.google.protobuf.ByteString;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
36 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
37 import org.opendaylight.controller.cluster.raft.RaftActorContext;
38 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
41 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
42 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
43 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
45 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
46 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
47 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
48 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
49 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
50 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
51 import scala.concurrent.duration.FiniteDuration;
53 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
55 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
56 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
58 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
59 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
61 private Follower follower;
63 private final short payloadVersion = 5;
67 public void tearDown() throws Exception {
68 if(follower != null) {
76 protected Follower createBehavior(RaftActorContext actorContext) {
77 return spy(new Follower(actorContext));
81 protected MockRaftActorContext createActorContext() {
82 return createActorContext(followerActor);
86 protected MockRaftActorContext createActorContext(ActorRef actorRef){
87 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
88 context.setPayloadVersion(payloadVersion );
93 public void testThatAnElectionTimeoutIsTriggered(){
94 MockRaftActorContext actorContext = createActorContext();
95 follower = new Follower(actorContext);
97 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
98 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
102 public void testHandleElectionTimeout(){
103 logStart("testHandleElectionTimeout");
105 follower = new Follower(createActorContext());
107 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, ElectionTimeout.INSTANCE);
109 assertTrue(raftBehavior instanceof Candidate);
113 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
114 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
116 MockRaftActorContext context = createActorContext();
118 context.getTermInformation().update(term, null);
120 follower = createBehavior(context);
122 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
124 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
126 assertEquals("isVoteGranted", true, reply.isVoteGranted());
127 assertEquals("getTerm", term, reply.getTerm());
128 verify(follower).scheduleElection(any(FiniteDuration.class));
132 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
133 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
135 MockRaftActorContext context = createActorContext();
137 context.getTermInformation().update(term, "test");
139 follower = createBehavior(context);
141 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
143 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
145 assertEquals("isVoteGranted", false, reply.isVoteGranted());
146 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
151 public void testHandleFirstAppendEntries() throws Exception {
152 logStart("testHandleFirstAppendEntries");
154 MockRaftActorContext context = createActorContext();
155 context.getReplicatedLog().clear(0,2);
156 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
157 context.getReplicatedLog().setSnapshotIndex(99);
159 List<ReplicatedLogEntry> entries = Arrays.asList(
160 newReplicatedLogEntry(2, 101, "foo"));
162 Assert.assertEquals(1, context.getReplicatedLog().size());
164 // The new commitIndex is 101
165 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
167 follower = createBehavior(context);
168 follower.handleMessage(leaderActor, appendEntries);
170 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
171 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
173 assertFalse(syncStatus.isInitialSyncDone());
174 assertTrue("append entries reply should be true", reply.isSuccess());
178 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
179 logStart("testHandleFirstAppendEntries");
181 MockRaftActorContext context = createActorContext();
183 List<ReplicatedLogEntry> entries = Arrays.asList(
184 newReplicatedLogEntry(2, 101, "foo"));
186 // The new commitIndex is 101
187 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
189 follower = createBehavior(context);
190 follower.handleMessage(leaderActor, appendEntries);
192 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
193 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
195 assertFalse(syncStatus.isInitialSyncDone());
196 assertFalse("append entries reply should be false", reply.isSuccess());
200 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
201 logStart("testHandleFirstAppendEntries");
203 MockRaftActorContext context = createActorContext();
204 context.getReplicatedLog().clear(0,2);
205 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
206 context.getReplicatedLog().setSnapshotIndex(99);
208 List<ReplicatedLogEntry> entries = Arrays.asList(
209 newReplicatedLogEntry(2, 101, "foo"));
211 // The new commitIndex is 101
212 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
214 follower = createBehavior(context);
215 follower.handleMessage(leaderActor, appendEntries);
217 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
218 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
220 assertFalse(syncStatus.isInitialSyncDone());
221 assertTrue("append entries reply should be true", reply.isSuccess());
225 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
226 logStart("testHandleFirstAppendEntries");
228 MockRaftActorContext context = createActorContext();
229 context.getReplicatedLog().clear(0,2);
230 context.getReplicatedLog().setSnapshotIndex(100);
232 List<ReplicatedLogEntry> entries = Arrays.asList(
233 newReplicatedLogEntry(2, 101, "foo"));
235 // The new commitIndex is 101
236 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
238 follower = createBehavior(context);
239 follower.handleMessage(leaderActor, appendEntries);
241 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
242 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
244 assertFalse(syncStatus.isInitialSyncDone());
245 assertTrue("append entries reply should be true", reply.isSuccess());
249 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
250 logStart("testHandleFirstAppendEntries");
252 MockRaftActorContext context = createActorContext();
253 context.getReplicatedLog().clear(0,2);
254 context.getReplicatedLog().setSnapshotIndex(100);
256 List<ReplicatedLogEntry> entries = Arrays.asList(
257 newReplicatedLogEntry(2, 105, "foo"));
259 // The new commitIndex is 101
260 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
262 follower = createBehavior(context);
263 follower.handleMessage(leaderActor, appendEntries);
265 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
266 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
268 assertFalse(syncStatus.isInitialSyncDone());
269 assertFalse("append entries reply should be false", reply.isSuccess());
273 public void testHandleSyncUpAppendEntries() throws Exception {
274 logStart("testHandleSyncUpAppendEntries");
276 MockRaftActorContext context = createActorContext();
278 List<ReplicatedLogEntry> entries = Arrays.asList(
279 newReplicatedLogEntry(2, 101, "foo"));
281 // The new commitIndex is 101
282 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
284 follower = createBehavior(context);
285 follower.handleMessage(leaderActor, appendEntries);
287 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
289 assertFalse(syncStatus.isInitialSyncDone());
291 // Clear all the messages
292 followerActor.underlyingActor().clear();
294 context.setLastApplied(101);
295 context.setCommitIndex(101);
296 setLastLogEntry(context, 1, 101,
297 new MockRaftActorContext.MockPayload(""));
299 entries = Arrays.asList(
300 newReplicatedLogEntry(2, 101, "foo"));
302 // The new commitIndex is 101
303 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
304 follower.handleMessage(leaderActor, appendEntries);
306 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
308 assertTrue(syncStatus.isInitialSyncDone());
310 followerActor.underlyingActor().clear();
312 // Sending the same message again should not generate another message
314 follower.handleMessage(leaderActor, appendEntries);
316 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
318 assertNull(syncStatus);
323 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
324 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
326 MockRaftActorContext context = createActorContext();
328 List<ReplicatedLogEntry> entries = Arrays.asList(
329 newReplicatedLogEntry(2, 101, "foo"));
331 // The new commitIndex is 101
332 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
334 follower = createBehavior(context);
335 follower.handleMessage(leaderActor, appendEntries);
337 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
339 assertFalse(syncStatus.isInitialSyncDone());
341 // Clear all the messages
342 followerActor.underlyingActor().clear();
344 context.setLastApplied(100);
345 setLastLogEntry(context, 1, 100,
346 new MockRaftActorContext.MockPayload(""));
348 entries = Arrays.asList(
349 newReplicatedLogEntry(2, 101, "foo"));
351 // leader-2 is becoming the leader now and it says the commitIndex is 45
352 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
353 follower.handleMessage(leaderActor, appendEntries);
355 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
357 // We get a new message saying initial status is not done
358 assertFalse(syncStatus.isInitialSyncDone());
364 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
365 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
367 MockRaftActorContext context = createActorContext();
369 List<ReplicatedLogEntry> entries = Arrays.asList(
370 newReplicatedLogEntry(2, 101, "foo"));
372 // The new commitIndex is 101
373 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
375 follower = createBehavior(context);
376 follower.handleMessage(leaderActor, appendEntries);
378 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
380 assertFalse(syncStatus.isInitialSyncDone());
382 // Clear all the messages
383 followerActor.underlyingActor().clear();
385 context.setLastApplied(101);
386 context.setCommitIndex(101);
387 setLastLogEntry(context, 1, 101,
388 new MockRaftActorContext.MockPayload(""));
390 entries = Arrays.asList(
391 newReplicatedLogEntry(2, 101, "foo"));
393 // The new commitIndex is 101
394 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
395 follower.handleMessage(leaderActor, appendEntries);
397 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
399 assertTrue(syncStatus.isInitialSyncDone());
401 // Clear all the messages
402 followerActor.underlyingActor().clear();
404 context.setLastApplied(100);
405 setLastLogEntry(context, 1, 100,
406 new MockRaftActorContext.MockPayload(""));
408 entries = Arrays.asList(
409 newReplicatedLogEntry(2, 101, "foo"));
411 // leader-2 is becoming the leader now and it says the commitIndex is 45
412 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
413 follower.handleMessage(leaderActor, appendEntries);
415 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
417 // We get a new message saying initial status is not done
418 assertFalse(syncStatus.isInitialSyncDone());
424 * This test verifies that when an AppendEntries RPC is received by a RaftActor
425 * with a commitIndex that is greater than what has been applied to the
426 * state machine of the RaftActor, the RaftActor applies the state and
427 * sets it current applied state to the commitIndex of the sender.
432 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
433 logStart("testHandleAppendEntriesWithNewerCommitIndex");
435 MockRaftActorContext context = createActorContext();
437 context.setLastApplied(100);
438 setLastLogEntry(context, 1, 100,
439 new MockRaftActorContext.MockPayload(""));
440 context.getReplicatedLog().setSnapshotIndex(99);
442 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
443 newReplicatedLogEntry(2, 101, "foo"));
445 // The new commitIndex is 101
446 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
448 follower = createBehavior(context);
449 follower.handleMessage(leaderActor, appendEntries);
451 assertEquals("getLastApplied", 101L, context.getLastApplied());
455 * This test verifies that when an AppendEntries is received a specific prevLogTerm
456 * which does not match the term that is in RaftActors log entry at prevLogIndex
457 * then the RaftActor does not change it's state and it returns a failure.
462 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
463 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
465 MockRaftActorContext context = createActorContext();
467 // First set the receivers term to lower number
468 context.getTermInformation().update(95, "test");
470 // AppendEntries is now sent with a bigger term
471 // this will set the receivers term to be the same as the sender's term
472 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
474 follower = createBehavior(context);
476 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
478 Assert.assertSame(follower, newBehavior);
480 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
481 AppendEntriesReply.class);
483 assertEquals("isSuccess", false, reply.isSuccess());
487 * This test verifies that when a new AppendEntries message is received with
488 * new entries and the logs of the sender and receiver match that the new
489 * entries get added to the log and the log is incremented by the number of
490 * entries received in appendEntries
495 public void testHandleAppendEntriesAddNewEntries() {
496 logStart("testHandleAppendEntriesAddNewEntries");
498 MockRaftActorContext context = createActorContext();
500 // First set the receivers term to lower number
501 context.getTermInformation().update(1, "test");
503 // Prepare the receivers log
504 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
505 log.append(newReplicatedLogEntry(1, 0, "zero"));
506 log.append(newReplicatedLogEntry(1, 1, "one"));
507 log.append(newReplicatedLogEntry(1, 2, "two"));
509 context.setReplicatedLog(log);
511 // Prepare the entries to be sent with AppendEntries
512 List<ReplicatedLogEntry> entries = new ArrayList<>();
513 entries.add(newReplicatedLogEntry(1, 3, "three"));
514 entries.add(newReplicatedLogEntry(1, 4, "four"));
516 // Send appendEntries with the same term as was set on the receiver
517 // before the new behavior was created (1 in this case)
518 // This will not work for a Candidate because as soon as a Candidate
519 // is created it increments the term
520 short leaderPayloadVersion = 10;
521 String leaderId = "leader-1";
522 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
524 follower = createBehavior(context);
526 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
528 Assert.assertSame(follower, newBehavior);
530 assertEquals("Next index", 5, log.last().getIndex() + 1);
531 assertEquals("Entry 3", entries.get(0), log.get(3));
532 assertEquals("Entry 4", entries.get(1), log.get(4));
534 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
535 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
537 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
541 * This test verifies that when a new AppendEntries message is received with
542 * new entries and the logs of the sender and receiver are out-of-sync that
543 * the log is first corrected by removing the out of sync entries from the
544 * log and then adding in the new entries sent with the AppendEntries message
547 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
548 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
550 MockRaftActorContext context = createActorContext();
552 // First set the receivers term to lower number
553 context.getTermInformation().update(1, "test");
555 // Prepare the receivers log
556 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
557 log.append(newReplicatedLogEntry(1, 0, "zero"));
558 log.append(newReplicatedLogEntry(1, 1, "one"));
559 log.append(newReplicatedLogEntry(1, 2, "two"));
561 context.setReplicatedLog(log);
563 // Prepare the entries to be sent with AppendEntries
564 List<ReplicatedLogEntry> entries = new ArrayList<>();
565 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
566 entries.add(newReplicatedLogEntry(2, 3, "three"));
568 // Send appendEntries with the same term as was set on the receiver
569 // before the new behavior was created (1 in this case)
570 // This will not work for a Candidate because as soon as a Candidate
571 // is created it increments the term
572 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
574 follower = createBehavior(context);
576 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
578 Assert.assertSame(follower, newBehavior);
580 // The entry at index 2 will be found out-of-sync with the leader
581 // and will be removed
582 // Then the two new entries will be added to the log
583 // Thus making the log to have 4 entries
584 assertEquals("Next index", 4, log.last().getIndex() + 1);
585 //assertEquals("Entry 2", entries.get(0), log.get(2));
587 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
589 // Check that the entry at index 2 has the new data
590 assertEquals("Entry 2", entries.get(0), log.get(2));
592 assertEquals("Entry 3", entries.get(1), log.get(3));
594 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
598 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
599 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
601 MockRaftActorContext context = createActorContext();
603 // First set the receivers term to lower number
604 context.getTermInformation().update(1, "test");
606 // Prepare the receivers log
607 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
608 log.append(newReplicatedLogEntry(1, 0, "zero"));
609 log.append(newReplicatedLogEntry(1, 1, "one"));
610 log.append(newReplicatedLogEntry(1, 2, "two"));
612 context.setReplicatedLog(log);
614 // Prepare the entries to be sent with AppendEntries
615 List<ReplicatedLogEntry> entries = new ArrayList<>();
616 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
617 entries.add(newReplicatedLogEntry(2, 3, "three"));
619 // Send appendEntries with the same term as was set on the receiver
620 // before the new behavior was created (1 in this case)
621 // This will not work for a Candidate because as soon as a Candidate
622 // is created it increments the term
623 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
625 context.setRaftPolicy(createRaftPolicy(false, true));
626 follower = createBehavior(context);
628 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
630 Assert.assertSame(follower, newBehavior);
632 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
636 public void testHandleAppendEntriesPreviousLogEntryMissing(){
637 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
639 MockRaftActorContext context = createActorContext();
641 // Prepare the receivers log
642 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
643 log.append(newReplicatedLogEntry(1, 0, "zero"));
644 log.append(newReplicatedLogEntry(1, 1, "one"));
645 log.append(newReplicatedLogEntry(1, 2, "two"));
647 context.setReplicatedLog(log);
649 // Prepare the entries to be sent with AppendEntries
650 List<ReplicatedLogEntry> entries = new ArrayList<>();
651 entries.add(newReplicatedLogEntry(1, 4, "four"));
653 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
655 follower = createBehavior(context);
657 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
659 Assert.assertSame(follower, newBehavior);
661 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
665 public void testHandleAppendEntriesWithExistingLogEntry() {
666 logStart("testHandleAppendEntriesWithExistingLogEntry");
668 MockRaftActorContext context = createActorContext();
670 context.getTermInformation().update(1, "test");
672 // Prepare the receivers log
673 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
674 log.append(newReplicatedLogEntry(1, 0, "zero"));
675 log.append(newReplicatedLogEntry(1, 1, "one"));
677 context.setReplicatedLog(log);
679 // Send the last entry again.
680 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
682 follower = createBehavior(context);
684 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
686 assertEquals("Next index", 2, log.last().getIndex() + 1);
687 assertEquals("Entry 1", entries.get(0), log.get(1));
689 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
691 // Send the last entry again and also a new one.
693 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
695 leaderActor.underlyingActor().clear();
696 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
698 assertEquals("Next index", 3, log.last().getIndex() + 1);
699 assertEquals("Entry 1", entries.get(0), log.get(1));
700 assertEquals("Entry 2", entries.get(1), log.get(2));
702 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
706 public void testHandleAppendEntriesAfterInstallingSnapshot(){
707 logStart("testHandleAppendAfterInstallingSnapshot");
709 MockRaftActorContext context = createActorContext();
711 // Prepare the receivers log
712 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
714 // Set up a log as if it has been snapshotted
715 log.setSnapshotIndex(3);
716 log.setSnapshotTerm(1);
718 context.setReplicatedLog(log);
720 // Prepare the entries to be sent with AppendEntries
721 List<ReplicatedLogEntry> entries = new ArrayList<>();
722 entries.add(newReplicatedLogEntry(1, 4, "four"));
724 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
726 follower = createBehavior(context);
728 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
730 Assert.assertSame(follower, newBehavior);
732 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
737 * This test verifies that when InstallSnapshot is received by
738 * the follower its applied correctly.
743 public void testHandleInstallSnapshot() throws Exception {
744 logStart("testHandleInstallSnapshot");
746 MockRaftActorContext context = createActorContext();
747 context.getTermInformation().update(1, "leader");
749 follower = createBehavior(context);
751 ByteString bsSnapshot = createSnapshot();
753 int snapshotLength = bsSnapshot.size();
755 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
756 int lastIncludedIndex = 1;
758 InstallSnapshot lastInstallSnapshot = null;
760 for(int i = 0; i < totalChunks; i++) {
761 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
762 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
763 chunkData, chunkIndex, totalChunks);
764 follower.handleMessage(leaderActor, lastInstallSnapshot);
765 offset = offset + 50;
770 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
771 ApplySnapshot.class);
772 Snapshot snapshot = applySnapshot.getSnapshot();
773 assertNotNull(lastInstallSnapshot);
774 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
775 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
776 snapshot.getLastAppliedTerm());
777 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
778 snapshot.getLastAppliedIndex());
779 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
780 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
781 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
782 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
783 applySnapshot.getCallback().onSuccess();
785 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
786 leaderActor, InstallSnapshotReply.class);
787 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
790 for(InstallSnapshotReply reply: replies) {
791 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
792 assertEquals("getTerm", 1, reply.getTerm());
793 assertEquals("isSuccess", true, reply.isSuccess());
794 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
797 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
802 * Verify that when an AppendEntries is sent to a follower during a snapshot install
803 * the Follower short-circuits the processing of the AppendEntries message.
808 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
809 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
811 MockRaftActorContext context = createActorContext();
813 follower = createBehavior(context);
815 ByteString bsSnapshot = createSnapshot();
816 int snapshotLength = bsSnapshot.size();
818 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
819 int lastIncludedIndex = 1;
821 // Check that snapshot installation is not in progress
822 assertNull(follower.getSnapshotTracker());
824 // Make sure that we have more than 1 chunk to send
825 assertTrue(totalChunks > 1);
827 // Send an install snapshot with the first chunk to start the process of installing a snapshot
828 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
829 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
830 chunkData, 1, totalChunks));
832 // Check if snapshot installation is in progress now
833 assertNotNull(follower.getSnapshotTracker());
835 // Send an append entry
836 AppendEntries appendEntries = mock(AppendEntries.class);
837 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
839 follower.handleMessage(leaderActor, appendEntries);
841 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
842 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
843 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
844 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
846 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
847 verify(appendEntries, never()).getPrevLogIndex();
852 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
853 logStart("testInitialSyncUpWithHandleInstallSnapshot");
855 MockRaftActorContext context = createActorContext();
857 follower = createBehavior(context);
859 ByteString bsSnapshot = createSnapshot();
861 int snapshotLength = bsSnapshot.size();
863 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
864 int lastIncludedIndex = 1;
866 InstallSnapshot lastInstallSnapshot = null;
868 for(int i = 0; i < totalChunks; i++) {
869 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
870 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
871 chunkData, chunkIndex, totalChunks);
872 follower.handleMessage(leaderActor, lastInstallSnapshot);
873 offset = offset + 50;
878 FollowerInitialSyncUpStatus syncStatus =
879 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
881 assertFalse(syncStatus.isInitialSyncDone());
883 // Clear all the messages
884 followerActor.underlyingActor().clear();
886 context.setLastApplied(101);
887 context.setCommitIndex(101);
888 setLastLogEntry(context, 1, 101,
889 new MockRaftActorContext.MockPayload(""));
891 List<ReplicatedLogEntry> entries = Arrays.asList(
892 newReplicatedLogEntry(2, 101, "foo"));
894 // The new commitIndex is 101
895 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
896 follower.handleMessage(leaderActor, appendEntries);
898 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
900 assertTrue(syncStatus.isInitialSyncDone());
904 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
905 logStart("testHandleOutOfSequenceInstallSnapshot");
907 MockRaftActorContext context = createActorContext();
909 follower = createBehavior(context);
911 ByteString bsSnapshot = createSnapshot();
913 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
914 getNextChunk(bsSnapshot, 10, 50), 3, 3);
915 follower.handleMessage(leaderActor, installSnapshot);
917 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
918 InstallSnapshotReply.class);
920 assertEquals("isSuccess", false, reply.isSuccess());
921 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
922 assertEquals("getTerm", 1, reply.getTerm());
923 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
925 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
929 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
930 MockRaftActorContext context = createActorContext();
932 Stopwatch stopwatch = Stopwatch.createStarted();
934 follower = createBehavior(context);
936 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
938 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
940 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
944 public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
945 MockRaftActorContext context = createActorContext();
946 context.setConfigParams(new DefaultConfigParamsImpl(){
948 public FiniteDuration getElectionTimeOutInterval() {
949 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
953 context.setRaftPolicy(createRaftPolicy(false, false));
955 follower = createBehavior(context);
957 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
961 public void testElectionScheduledWhenAnyRaftRPCReceived(){
962 MockRaftActorContext context = createActorContext();
963 follower = createBehavior(context);
964 follower.handleMessage(leaderActor, new RaftRPC() {
965 private static final long serialVersionUID = 1L;
968 public long getTerm() {
972 verify(follower).scheduleElection(any(FiniteDuration.class));
976 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
977 MockRaftActorContext context = createActorContext();
978 follower = createBehavior(context);
979 follower.handleMessage(leaderActor, "non-raft-rpc");
980 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
983 public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
984 int snapshotLength = bs.size();
986 int size = chunkSize;
987 if (chunkSize > snapshotLength) {
988 size = snapshotLength;
990 if ((start + chunkSize) > snapshotLength) {
991 size = snapshotLength - start;
995 byte[] nextChunk = new byte[size];
996 bs.copyTo(nextChunk, start, 0, size);
1000 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1001 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1002 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1005 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1006 String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1007 boolean expForceInstallSnapshot) {
1009 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1010 AppendEntriesReply.class);
1012 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1013 assertEquals("getTerm", expTerm, reply.getTerm());
1014 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1015 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1016 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1017 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1018 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1022 private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1023 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1024 new MockRaftActorContext.MockPayload(data));
1027 private ByteString createSnapshot(){
1028 HashMap<String, String> followerSnapshot = new HashMap<>();
1029 followerSnapshot.put("1", "A");
1030 followerSnapshot.put("2", "B");
1031 followerSnapshot.put("3", "C");
1033 return toByteString(followerSnapshot);
1037 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1038 ActorRef actorRef, RaftRPC rpc) throws Exception {
1039 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1041 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1042 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1046 protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1048 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1049 assertEquals("isSuccess", true, reply.isSuccess());