2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.verify;
23 import akka.actor.ActorRef;
24 import akka.actor.Props;
25 import akka.testkit.TestActorRef;
26 import com.google.common.base.Stopwatch;
27 import com.google.protobuf.ByteString;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Assert;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
41 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
42 import org.opendaylight.controller.cluster.raft.Snapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
45 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
47 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
48 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
49 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
50 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
51 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
52 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
53 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
54 import scala.concurrent.duration.FiniteDuration;
56 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
58 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
59 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
61 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
62 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
64 private Follower follower;
66 private final short payloadVersion = 5;
70 public void tearDown() throws Exception {
71 if(follower != null) {
79 protected Follower createBehavior(RaftActorContext actorContext) {
80 return spy(new Follower(actorContext));
84 protected MockRaftActorContext createActorContext() {
85 return createActorContext(followerActor);
89 protected MockRaftActorContext createActorContext(ActorRef actorRef){
90 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
91 context.setPayloadVersion(payloadVersion );
96 public void testThatAnElectionTimeoutIsTriggered(){
97 MockRaftActorContext actorContext = createActorContext();
98 follower = new Follower(actorContext);
100 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
101 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
105 public void testHandleElectionTimeout(){
106 logStart("testHandleElectionTimeout");
108 follower = new Follower(createActorContext());
110 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, ElectionTimeout.INSTANCE);
112 assertTrue(raftBehavior instanceof Candidate);
116 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
117 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
119 MockRaftActorContext context = createActorContext();
121 context.getTermInformation().update(term, null);
123 follower = createBehavior(context);
125 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
127 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
129 assertEquals("isVoteGranted", true, reply.isVoteGranted());
130 assertEquals("getTerm", term, reply.getTerm());
131 verify(follower).scheduleElection(any(FiniteDuration.class));
135 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
136 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
138 MockRaftActorContext context = createActorContext();
140 context.getTermInformation().update(term, "test");
142 follower = createBehavior(context);
144 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
146 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
148 assertEquals("isVoteGranted", false, reply.isVoteGranted());
149 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
154 public void testHandleFirstAppendEntries() throws Exception {
155 logStart("testHandleFirstAppendEntries");
157 MockRaftActorContext context = createActorContext();
158 context.getReplicatedLog().clear(0,2);
159 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
160 context.getReplicatedLog().setSnapshotIndex(99);
162 List<ReplicatedLogEntry> entries = Arrays.asList(
163 newReplicatedLogEntry(2, 101, "foo"));
165 Assert.assertEquals(1, context.getReplicatedLog().size());
167 // The new commitIndex is 101
168 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
170 follower = createBehavior(context);
171 follower.handleMessage(leaderActor, appendEntries);
173 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
174 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
176 assertFalse(syncStatus.isInitialSyncDone());
177 assertTrue("append entries reply should be true", reply.isSuccess());
181 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
182 logStart("testHandleFirstAppendEntries");
184 MockRaftActorContext context = createActorContext();
186 List<ReplicatedLogEntry> entries = Arrays.asList(
187 newReplicatedLogEntry(2, 101, "foo"));
189 // The new commitIndex is 101
190 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
192 follower = createBehavior(context);
193 follower.handleMessage(leaderActor, appendEntries);
195 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
196 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
198 assertFalse(syncStatus.isInitialSyncDone());
199 assertFalse("append entries reply should be false", reply.isSuccess());
203 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
204 logStart("testHandleFirstAppendEntries");
206 MockRaftActorContext context = createActorContext();
207 context.getReplicatedLog().clear(0,2);
208 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
209 context.getReplicatedLog().setSnapshotIndex(99);
211 List<ReplicatedLogEntry> entries = Arrays.asList(
212 newReplicatedLogEntry(2, 101, "foo"));
214 // The new commitIndex is 101
215 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
217 follower = createBehavior(context);
218 follower.handleMessage(leaderActor, appendEntries);
220 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
221 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
223 assertFalse(syncStatus.isInitialSyncDone());
224 assertTrue("append entries reply should be true", reply.isSuccess());
228 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
229 logStart("testHandleFirstAppendEntries");
231 MockRaftActorContext context = createActorContext();
232 context.getReplicatedLog().clear(0,2);
233 context.getReplicatedLog().setSnapshotIndex(100);
235 List<ReplicatedLogEntry> entries = Arrays.asList(
236 newReplicatedLogEntry(2, 101, "foo"));
238 // The new commitIndex is 101
239 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
241 follower = createBehavior(context);
242 follower.handleMessage(leaderActor, appendEntries);
244 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
245 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
247 assertFalse(syncStatus.isInitialSyncDone());
248 assertTrue("append entries reply should be true", reply.isSuccess());
252 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
253 logStart("testHandleFirstAppendEntries");
255 MockRaftActorContext context = createActorContext();
256 context.getReplicatedLog().clear(0,2);
257 context.getReplicatedLog().setSnapshotIndex(100);
259 List<ReplicatedLogEntry> entries = Arrays.asList(
260 newReplicatedLogEntry(2, 105, "foo"));
262 // The new commitIndex is 101
263 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
265 follower = createBehavior(context);
266 follower.handleMessage(leaderActor, appendEntries);
268 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
269 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
271 assertFalse(syncStatus.isInitialSyncDone());
272 assertFalse("append entries reply should be false", reply.isSuccess());
276 public void testHandleSyncUpAppendEntries() throws Exception {
277 logStart("testHandleSyncUpAppendEntries");
279 MockRaftActorContext context = createActorContext();
281 List<ReplicatedLogEntry> entries = Arrays.asList(
282 newReplicatedLogEntry(2, 101, "foo"));
284 // The new commitIndex is 101
285 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
287 follower = createBehavior(context);
288 follower.handleMessage(leaderActor, appendEntries);
290 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
292 assertFalse(syncStatus.isInitialSyncDone());
294 // Clear all the messages
295 followerActor.underlyingActor().clear();
297 context.setLastApplied(101);
298 context.setCommitIndex(101);
299 setLastLogEntry(context, 1, 101,
300 new MockRaftActorContext.MockPayload(""));
302 entries = Arrays.asList(
303 newReplicatedLogEntry(2, 101, "foo"));
305 // The new commitIndex is 101
306 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
307 follower.handleMessage(leaderActor, appendEntries);
309 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
311 assertTrue(syncStatus.isInitialSyncDone());
313 followerActor.underlyingActor().clear();
315 // Sending the same message again should not generate another message
317 follower.handleMessage(leaderActor, appendEntries);
319 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
321 assertNull(syncStatus);
326 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
327 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
329 MockRaftActorContext context = createActorContext();
331 List<ReplicatedLogEntry> entries = Arrays.asList(
332 newReplicatedLogEntry(2, 101, "foo"));
334 // The new commitIndex is 101
335 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
337 follower = createBehavior(context);
338 follower.handleMessage(leaderActor, appendEntries);
340 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
342 assertFalse(syncStatus.isInitialSyncDone());
344 // Clear all the messages
345 followerActor.underlyingActor().clear();
347 context.setLastApplied(100);
348 setLastLogEntry(context, 1, 100,
349 new MockRaftActorContext.MockPayload(""));
351 entries = Arrays.asList(
352 newReplicatedLogEntry(2, 101, "foo"));
354 // leader-2 is becoming the leader now and it says the commitIndex is 45
355 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
356 follower.handleMessage(leaderActor, appendEntries);
358 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
360 // We get a new message saying initial status is not done
361 assertFalse(syncStatus.isInitialSyncDone());
367 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
368 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
370 MockRaftActorContext context = createActorContext();
372 List<ReplicatedLogEntry> entries = Arrays.asList(
373 newReplicatedLogEntry(2, 101, "foo"));
375 // The new commitIndex is 101
376 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
378 follower = createBehavior(context);
379 follower.handleMessage(leaderActor, appendEntries);
381 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
383 assertFalse(syncStatus.isInitialSyncDone());
385 // Clear all the messages
386 followerActor.underlyingActor().clear();
388 context.setLastApplied(101);
389 context.setCommitIndex(101);
390 setLastLogEntry(context, 1, 101,
391 new MockRaftActorContext.MockPayload(""));
393 entries = Arrays.asList(
394 newReplicatedLogEntry(2, 101, "foo"));
396 // The new commitIndex is 101
397 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
398 follower.handleMessage(leaderActor, appendEntries);
400 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
402 assertTrue(syncStatus.isInitialSyncDone());
404 // Clear all the messages
405 followerActor.underlyingActor().clear();
407 context.setLastApplied(100);
408 setLastLogEntry(context, 1, 100,
409 new MockRaftActorContext.MockPayload(""));
411 entries = Arrays.asList(
412 newReplicatedLogEntry(2, 101, "foo"));
414 // leader-2 is becoming the leader now and it says the commitIndex is 45
415 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
416 follower.handleMessage(leaderActor, appendEntries);
418 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
420 // We get a new message saying initial status is not done
421 assertFalse(syncStatus.isInitialSyncDone());
427 * This test verifies that when an AppendEntries RPC is received by a RaftActor
428 * with a commitIndex that is greater than what has been applied to the
429 * state machine of the RaftActor, the RaftActor applies the state and
430 * sets it current applied state to the commitIndex of the sender.
435 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
436 logStart("testHandleAppendEntriesWithNewerCommitIndex");
438 MockRaftActorContext context = createActorContext();
440 context.setLastApplied(100);
441 setLastLogEntry(context, 1, 100,
442 new MockRaftActorContext.MockPayload(""));
443 context.getReplicatedLog().setSnapshotIndex(99);
445 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
446 newReplicatedLogEntry(2, 101, "foo"));
448 // The new commitIndex is 101
449 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
451 follower = createBehavior(context);
452 follower.handleMessage(leaderActor, appendEntries);
454 assertEquals("getLastApplied", 101L, context.getLastApplied());
458 * This test verifies that when an AppendEntries is received a specific prevLogTerm
459 * which does not match the term that is in RaftActors log entry at prevLogIndex
460 * then the RaftActor does not change it's state and it returns a failure.
465 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
466 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
468 MockRaftActorContext context = createActorContext();
470 // First set the receivers term to lower number
471 context.getTermInformation().update(95, "test");
473 // AppendEntries is now sent with a bigger term
474 // this will set the receivers term to be the same as the sender's term
475 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
477 follower = createBehavior(context);
479 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
481 Assert.assertSame(follower, newBehavior);
483 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
484 AppendEntriesReply.class);
486 assertEquals("isSuccess", false, reply.isSuccess());
490 * This test verifies that when a new AppendEntries message is received with
491 * new entries and the logs of the sender and receiver match that the new
492 * entries get added to the log and the log is incremented by the number of
493 * entries received in appendEntries
498 public void testHandleAppendEntriesAddNewEntries() {
499 logStart("testHandleAppendEntriesAddNewEntries");
501 MockRaftActorContext context = createActorContext();
503 // First set the receivers term to lower number
504 context.getTermInformation().update(1, "test");
506 // Prepare the receivers log
507 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
508 log.append(newReplicatedLogEntry(1, 0, "zero"));
509 log.append(newReplicatedLogEntry(1, 1, "one"));
510 log.append(newReplicatedLogEntry(1, 2, "two"));
512 context.setReplicatedLog(log);
514 // Prepare the entries to be sent with AppendEntries
515 List<ReplicatedLogEntry> entries = new ArrayList<>();
516 entries.add(newReplicatedLogEntry(1, 3, "three"));
517 entries.add(newReplicatedLogEntry(1, 4, "four"));
519 // Send appendEntries with the same term as was set on the receiver
520 // before the new behavior was created (1 in this case)
521 // This will not work for a Candidate because as soon as a Candidate
522 // is created it increments the term
523 short leaderPayloadVersion = 10;
524 String leaderId = "leader-1";
525 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
527 follower = createBehavior(context);
529 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
531 Assert.assertSame(follower, newBehavior);
533 assertEquals("Next index", 5, log.last().getIndex() + 1);
534 assertEquals("Entry 3", entries.get(0), log.get(3));
535 assertEquals("Entry 4", entries.get(1), log.get(4));
537 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
538 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
540 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
544 * This test verifies that when a new AppendEntries message is received with
545 * new entries and the logs of the sender and receiver are out-of-sync that
546 * the log is first corrected by removing the out of sync entries from the
547 * log and then adding in the new entries sent with the AppendEntries message
550 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
551 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
553 MockRaftActorContext context = createActorContext();
555 // First set the receivers term to lower number
556 context.getTermInformation().update(1, "test");
558 // Prepare the receivers log
559 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
560 log.append(newReplicatedLogEntry(1, 0, "zero"));
561 log.append(newReplicatedLogEntry(1, 1, "one"));
562 log.append(newReplicatedLogEntry(1, 2, "two"));
564 context.setReplicatedLog(log);
566 // Prepare the entries to be sent with AppendEntries
567 List<ReplicatedLogEntry> entries = new ArrayList<>();
568 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
569 entries.add(newReplicatedLogEntry(2, 3, "three"));
571 // Send appendEntries with the same term as was set on the receiver
572 // before the new behavior was created (1 in this case)
573 // This will not work for a Candidate because as soon as a Candidate
574 // is created it increments the term
575 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
577 follower = createBehavior(context);
579 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
581 Assert.assertSame(follower, newBehavior);
583 // The entry at index 2 will be found out-of-sync with the leader
584 // and will be removed
585 // Then the two new entries will be added to the log
586 // Thus making the log to have 4 entries
587 assertEquals("Next index", 4, log.last().getIndex() + 1);
588 //assertEquals("Entry 2", entries.get(0), log.get(2));
590 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
592 // Check that the entry at index 2 has the new data
593 assertEquals("Entry 2", entries.get(0), log.get(2));
595 assertEquals("Entry 3", entries.get(1), log.get(3));
597 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
601 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
602 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
604 MockRaftActorContext context = createActorContext();
606 // First set the receivers term to lower number
607 context.getTermInformation().update(1, "test");
609 // Prepare the receivers log
610 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
611 log.append(newReplicatedLogEntry(1, 0, "zero"));
612 log.append(newReplicatedLogEntry(1, 1, "one"));
613 log.append(newReplicatedLogEntry(1, 2, "two"));
615 context.setReplicatedLog(log);
617 // Prepare the entries to be sent with AppendEntries
618 List<ReplicatedLogEntry> entries = new ArrayList<>();
619 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
620 entries.add(newReplicatedLogEntry(2, 3, "three"));
622 // Send appendEntries with the same term as was set on the receiver
623 // before the new behavior was created (1 in this case)
624 // This will not work for a Candidate because as soon as a Candidate
625 // is created it increments the term
626 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
628 context.setRaftPolicy(createRaftPolicy(false, true));
629 follower = createBehavior(context);
631 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
633 Assert.assertSame(follower, newBehavior);
635 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
639 public void testHandleAppendEntriesPreviousLogEntryMissing(){
640 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
642 MockRaftActorContext context = createActorContext();
644 // Prepare the receivers log
645 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
646 log.append(newReplicatedLogEntry(1, 0, "zero"));
647 log.append(newReplicatedLogEntry(1, 1, "one"));
648 log.append(newReplicatedLogEntry(1, 2, "two"));
650 context.setReplicatedLog(log);
652 // Prepare the entries to be sent with AppendEntries
653 List<ReplicatedLogEntry> entries = new ArrayList<>();
654 entries.add(newReplicatedLogEntry(1, 4, "four"));
656 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
658 follower = createBehavior(context);
660 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
662 Assert.assertSame(follower, newBehavior);
664 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
668 public void testHandleAppendEntriesWithExistingLogEntry() {
669 logStart("testHandleAppendEntriesWithExistingLogEntry");
671 MockRaftActorContext context = createActorContext();
673 context.getTermInformation().update(1, "test");
675 // Prepare the receivers log
676 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
677 log.append(newReplicatedLogEntry(1, 0, "zero"));
678 log.append(newReplicatedLogEntry(1, 1, "one"));
680 context.setReplicatedLog(log);
682 // Send the last entry again.
683 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
685 follower = createBehavior(context);
687 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
689 assertEquals("Next index", 2, log.last().getIndex() + 1);
690 assertEquals("Entry 1", entries.get(0), log.get(1));
692 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
694 // Send the last entry again and also a new one.
696 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
698 leaderActor.underlyingActor().clear();
699 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
701 assertEquals("Next index", 3, log.last().getIndex() + 1);
702 assertEquals("Entry 1", entries.get(0), log.get(1));
703 assertEquals("Entry 2", entries.get(1), log.get(2));
705 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
709 public void testHandleAppendEntriesAfterInstallingSnapshot(){
710 logStart("testHandleAppendAfterInstallingSnapshot");
712 MockRaftActorContext context = createActorContext();
714 // Prepare the receivers log
715 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
717 // Set up a log as if it has been snapshotted
718 log.setSnapshotIndex(3);
719 log.setSnapshotTerm(1);
721 context.setReplicatedLog(log);
723 // Prepare the entries to be sent with AppendEntries
724 List<ReplicatedLogEntry> entries = new ArrayList<>();
725 entries.add(newReplicatedLogEntry(1, 4, "four"));
727 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
729 follower = createBehavior(context);
731 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
733 Assert.assertSame(follower, newBehavior);
735 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
740 * This test verifies that when InstallSnapshot is received by
741 * the follower its applied correctly.
746 public void testHandleInstallSnapshot() throws Exception {
747 logStart("testHandleInstallSnapshot");
749 MockRaftActorContext context = createActorContext();
750 context.getTermInformation().update(1, "leader");
752 follower = createBehavior(context);
754 ByteString bsSnapshot = createSnapshot();
756 int snapshotLength = bsSnapshot.size();
758 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
759 int lastIncludedIndex = 1;
761 InstallSnapshot lastInstallSnapshot = null;
763 for(int i = 0; i < totalChunks; i++) {
764 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
765 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
766 chunkData, chunkIndex, totalChunks);
767 follower.handleMessage(leaderActor, lastInstallSnapshot);
768 offset = offset + 50;
773 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
774 ApplySnapshot.class);
775 Snapshot snapshot = applySnapshot.getSnapshot();
776 assertNotNull(lastInstallSnapshot);
777 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
778 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
779 snapshot.getLastAppliedTerm());
780 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
781 snapshot.getLastAppliedIndex());
782 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
783 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
784 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
785 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
786 applySnapshot.getCallback().onSuccess();
788 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
789 leaderActor, InstallSnapshotReply.class);
790 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
793 for(InstallSnapshotReply reply: replies) {
794 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
795 assertEquals("getTerm", 1, reply.getTerm());
796 assertEquals("isSuccess", true, reply.isSuccess());
797 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
800 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
805 * Verify that when an AppendEntries is sent to a follower during a snapshot install
806 * the Follower short-circuits the processing of the AppendEntries message.
811 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
812 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
814 MockRaftActorContext context = createActorContext();
816 follower = createBehavior(context);
818 ByteString bsSnapshot = createSnapshot();
819 int snapshotLength = bsSnapshot.size();
821 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
822 int lastIncludedIndex = 1;
824 // Check that snapshot installation is not in progress
825 assertNull(follower.getSnapshotTracker());
827 // Make sure that we have more than 1 chunk to send
828 assertTrue(totalChunks > 1);
830 // Send an install snapshot with the first chunk to start the process of installing a snapshot
831 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
832 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
833 chunkData, 1, totalChunks));
835 // Check if snapshot installation is in progress now
836 assertNotNull(follower.getSnapshotTracker());
838 // Send an append entry
839 AppendEntries appendEntries = mock(AppendEntries.class);
840 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
842 follower.handleMessage(leaderActor, appendEntries);
844 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
845 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
846 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
847 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
849 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
850 verify(appendEntries, never()).getPrevLogIndex();
855 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
856 logStart("testInitialSyncUpWithHandleInstallSnapshot");
858 MockRaftActorContext context = createActorContext();
860 follower = createBehavior(context);
862 ByteString bsSnapshot = createSnapshot();
864 int snapshotLength = bsSnapshot.size();
866 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
867 int lastIncludedIndex = 1;
869 InstallSnapshot lastInstallSnapshot = null;
871 for(int i = 0; i < totalChunks; i++) {
872 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
873 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
874 chunkData, chunkIndex, totalChunks);
875 follower.handleMessage(leaderActor, lastInstallSnapshot);
876 offset = offset + 50;
881 FollowerInitialSyncUpStatus syncStatus =
882 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
884 assertFalse(syncStatus.isInitialSyncDone());
886 // Clear all the messages
887 followerActor.underlyingActor().clear();
889 context.setLastApplied(101);
890 context.setCommitIndex(101);
891 setLastLogEntry(context, 1, 101,
892 new MockRaftActorContext.MockPayload(""));
894 List<ReplicatedLogEntry> entries = Arrays.asList(
895 newReplicatedLogEntry(2, 101, "foo"));
897 // The new commitIndex is 101
898 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
899 follower.handleMessage(leaderActor, appendEntries);
901 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
903 assertTrue(syncStatus.isInitialSyncDone());
907 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
908 logStart("testHandleOutOfSequenceInstallSnapshot");
910 MockRaftActorContext context = createActorContext();
912 follower = createBehavior(context);
914 ByteString bsSnapshot = createSnapshot();
916 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
917 getNextChunk(bsSnapshot, 10, 50), 3, 3);
918 follower.handleMessage(leaderActor, installSnapshot);
920 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
921 InstallSnapshotReply.class);
923 assertEquals("isSuccess", false, reply.isSuccess());
924 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
925 assertEquals("getTerm", 1, reply.getTerm());
926 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
928 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
932 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
933 MockRaftActorContext context = createActorContext();
935 Stopwatch stopwatch = Stopwatch.createStarted();
937 follower = createBehavior(context);
939 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
940 ElectionTimeout.class);
942 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
944 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
946 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
947 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
951 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){
952 MockRaftActorContext context = createActorContext();
953 context.setConfigParams(new DefaultConfigParamsImpl(){
955 public FiniteDuration getElectionTimeOutInterval() {
956 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
960 context.setRaftPolicy(createRaftPolicy(false, false));
962 follower = createBehavior(context);
964 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
965 ElectionTimeout.class);
966 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
967 assertSame("handleMessage result", follower, newBehavior);
971 public void testFollowerSchedulesElectionIfNonVoting(){
972 MockRaftActorContext context = createActorContext();
973 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
974 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
975 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
976 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
978 follower = new Follower(context, "leader", (short)1);
980 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
981 ElectionTimeout.class);
982 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
983 assertSame("handleMessage result", follower, newBehavior);
984 assertNull("Expected null leaderId", follower.getLeaderId());
988 public void testElectionScheduledWhenAnyRaftRPCReceived(){
989 MockRaftActorContext context = createActorContext();
990 follower = createBehavior(context);
991 follower.handleMessage(leaderActor, new RaftRPC() {
992 private static final long serialVersionUID = 1L;
995 public long getTerm() {
999 verify(follower).scheduleElection(any(FiniteDuration.class));
1003 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
1004 MockRaftActorContext context = createActorContext();
1005 follower = createBehavior(context);
1006 follower.handleMessage(leaderActor, "non-raft-rpc");
1007 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1010 public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
1011 int snapshotLength = bs.size();
1013 int size = chunkSize;
1014 if (chunkSize > snapshotLength) {
1015 size = snapshotLength;
1017 if ((start + chunkSize) > snapshotLength) {
1018 size = snapshotLength - start;
1022 byte[] nextChunk = new byte[size];
1023 bs.copyTo(nextChunk, start, 0, size);
1027 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1028 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1029 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1032 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1033 String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1034 boolean expForceInstallSnapshot) {
1036 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1037 AppendEntriesReply.class);
1039 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1040 assertEquals("getTerm", expTerm, reply.getTerm());
1041 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1042 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1043 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1044 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1045 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1049 private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1050 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1051 new MockRaftActorContext.MockPayload(data));
1054 private ByteString createSnapshot(){
1055 HashMap<String, String> followerSnapshot = new HashMap<>();
1056 followerSnapshot.put("1", "A");
1057 followerSnapshot.put("2", "B");
1058 followerSnapshot.put("3", "C");
1060 return toByteString(followerSnapshot);
1064 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1065 ActorRef actorRef, RaftRPC rpc) throws Exception {
1066 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1068 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1069 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1073 protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1075 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1076 assertEquals("isSuccess", true, reply.isSuccess());