2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.verify;
23 import akka.actor.ActorRef;
24 import akka.actor.Props;
25 import akka.testkit.TestActorRef;
26 import com.google.common.base.Stopwatch;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.google.protobuf.ByteString;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.concurrent.TimeUnit;
35 import org.junit.After;
36 import org.junit.Assert;
37 import org.junit.Test;
38 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
39 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorContext;
41 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
42 import org.opendaylight.controller.cluster.raft.Snapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
45 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
46 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
47 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
49 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
50 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
51 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
52 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
53 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
54 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
55 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
56 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
57 import scala.concurrent.duration.FiniteDuration;
59 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
61 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
62 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
64 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
65 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
67 private Follower follower;
69 private final short payloadVersion = 5;
73 public void tearDown() throws Exception {
74 if(follower != null) {
82 protected Follower createBehavior(RaftActorContext actorContext) {
83 return spy(new Follower(actorContext));
87 protected MockRaftActorContext createActorContext() {
88 return createActorContext(followerActor);
92 protected MockRaftActorContext createActorContext(ActorRef actorRef){
93 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
94 context.setPayloadVersion(payloadVersion );
99 public void testThatAnElectionTimeoutIsTriggered(){
100 MockRaftActorContext actorContext = createActorContext();
101 follower = new Follower(actorContext);
103 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
104 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
108 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
109 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
111 MockRaftActorContext context = createActorContext();
112 follower = new Follower(context);
114 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
115 TimeUnit.MILLISECONDS);
116 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
118 assertTrue(raftBehavior instanceof Candidate);
122 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
123 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
125 MockRaftActorContext context = createActorContext();
126 ((DefaultConfigParamsImpl) context.getConfigParams()).
127 setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
128 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
130 follower = new Follower(context);
131 context.setCurrentBehavior(follower);
133 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
134 getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
135 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
138 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
139 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
140 assertTrue(raftBehavior instanceof Follower);
142 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
143 getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
144 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
147 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
148 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
149 assertTrue(raftBehavior instanceof Follower);
153 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
154 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
156 MockRaftActorContext context = createActorContext();
158 context.getTermInformation().update(term, null);
160 follower = createBehavior(context);
162 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
164 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
166 assertEquals("isVoteGranted", true, reply.isVoteGranted());
167 assertEquals("getTerm", term, reply.getTerm());
168 verify(follower).scheduleElection(any(FiniteDuration.class));
172 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
173 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
175 MockRaftActorContext context = createActorContext();
177 context.getTermInformation().update(term, "test");
179 follower = createBehavior(context);
181 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
183 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
185 assertEquals("isVoteGranted", false, reply.isVoteGranted());
186 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
191 public void testHandleFirstAppendEntries() throws Exception {
192 logStart("testHandleFirstAppendEntries");
194 MockRaftActorContext context = createActorContext();
195 context.getReplicatedLog().clear(0,2);
196 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
197 context.getReplicatedLog().setSnapshotIndex(99);
199 List<ReplicatedLogEntry> entries = Arrays.asList(
200 newReplicatedLogEntry(2, 101, "foo"));
202 Assert.assertEquals(1, context.getReplicatedLog().size());
204 // The new commitIndex is 101
205 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
207 follower = createBehavior(context);
208 follower.handleMessage(leaderActor, appendEntries);
210 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
211 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
213 assertFalse(syncStatus.isInitialSyncDone());
214 assertTrue("append entries reply should be true", reply.isSuccess());
218 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
219 logStart("testHandleFirstAppendEntries");
221 MockRaftActorContext context = createActorContext();
223 List<ReplicatedLogEntry> entries = Arrays.asList(
224 newReplicatedLogEntry(2, 101, "foo"));
226 // The new commitIndex is 101
227 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
229 follower = createBehavior(context);
230 follower.handleMessage(leaderActor, appendEntries);
232 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
233 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
235 assertFalse(syncStatus.isInitialSyncDone());
236 assertFalse("append entries reply should be false", reply.isSuccess());
240 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
241 logStart("testHandleFirstAppendEntries");
243 MockRaftActorContext context = createActorContext();
244 context.getReplicatedLog().clear(0,2);
245 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
246 context.getReplicatedLog().setSnapshotIndex(99);
248 List<ReplicatedLogEntry> entries = Arrays.asList(
249 newReplicatedLogEntry(2, 101, "foo"));
251 // The new commitIndex is 101
252 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
254 follower = createBehavior(context);
255 follower.handleMessage(leaderActor, appendEntries);
257 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
258 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
260 assertFalse(syncStatus.isInitialSyncDone());
261 assertTrue("append entries reply should be true", reply.isSuccess());
265 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
266 logStart("testHandleFirstAppendEntries");
268 MockRaftActorContext context = createActorContext();
269 context.getReplicatedLog().clear(0,2);
270 context.getReplicatedLog().setSnapshotIndex(100);
272 List<ReplicatedLogEntry> entries = Arrays.asList(
273 newReplicatedLogEntry(2, 101, "foo"));
275 // The new commitIndex is 101
276 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
278 follower = createBehavior(context);
279 follower.handleMessage(leaderActor, appendEntries);
281 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
282 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
284 assertFalse(syncStatus.isInitialSyncDone());
285 assertTrue("append entries reply should be true", reply.isSuccess());
289 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
290 logStart("testHandleFirstAppendEntries");
292 MockRaftActorContext context = createActorContext();
293 context.getReplicatedLog().clear(0,2);
294 context.getReplicatedLog().setSnapshotIndex(100);
296 List<ReplicatedLogEntry> entries = Arrays.asList(
297 newReplicatedLogEntry(2, 105, "foo"));
299 // The new commitIndex is 101
300 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
302 follower = createBehavior(context);
303 follower.handleMessage(leaderActor, appendEntries);
305 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
306 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
308 assertFalse(syncStatus.isInitialSyncDone());
309 assertFalse("append entries reply should be false", reply.isSuccess());
313 public void testHandleSyncUpAppendEntries() throws Exception {
314 logStart("testHandleSyncUpAppendEntries");
316 MockRaftActorContext context = createActorContext();
318 List<ReplicatedLogEntry> entries = Arrays.asList(
319 newReplicatedLogEntry(2, 101, "foo"));
321 // The new commitIndex is 101
322 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
324 follower = createBehavior(context);
325 follower.handleMessage(leaderActor, appendEntries);
327 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
329 assertFalse(syncStatus.isInitialSyncDone());
331 // Clear all the messages
332 followerActor.underlyingActor().clear();
334 context.setLastApplied(101);
335 context.setCommitIndex(101);
336 setLastLogEntry(context, 1, 101,
337 new MockRaftActorContext.MockPayload(""));
339 entries = Arrays.asList(
340 newReplicatedLogEntry(2, 101, "foo"));
342 // The new commitIndex is 101
343 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
344 follower.handleMessage(leaderActor, appendEntries);
346 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
348 assertTrue(syncStatus.isInitialSyncDone());
350 followerActor.underlyingActor().clear();
352 // Sending the same message again should not generate another message
354 follower.handleMessage(leaderActor, appendEntries);
356 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
358 assertNull(syncStatus);
363 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
364 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
366 MockRaftActorContext context = createActorContext();
368 List<ReplicatedLogEntry> entries = Arrays.asList(
369 newReplicatedLogEntry(2, 101, "foo"));
371 // The new commitIndex is 101
372 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
374 follower = createBehavior(context);
375 follower.handleMessage(leaderActor, appendEntries);
377 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
379 assertFalse(syncStatus.isInitialSyncDone());
381 // Clear all the messages
382 followerActor.underlyingActor().clear();
384 context.setLastApplied(100);
385 setLastLogEntry(context, 1, 100,
386 new MockRaftActorContext.MockPayload(""));
388 entries = Arrays.asList(
389 newReplicatedLogEntry(2, 101, "foo"));
391 // leader-2 is becoming the leader now and it says the commitIndex is 45
392 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
393 follower.handleMessage(leaderActor, appendEntries);
395 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
397 // We get a new message saying initial status is not done
398 assertFalse(syncStatus.isInitialSyncDone());
404 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
405 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
407 MockRaftActorContext context = createActorContext();
409 List<ReplicatedLogEntry> entries = Arrays.asList(
410 newReplicatedLogEntry(2, 101, "foo"));
412 // The new commitIndex is 101
413 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
415 follower = createBehavior(context);
416 follower.handleMessage(leaderActor, appendEntries);
418 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
420 assertFalse(syncStatus.isInitialSyncDone());
422 // Clear all the messages
423 followerActor.underlyingActor().clear();
425 context.setLastApplied(101);
426 context.setCommitIndex(101);
427 setLastLogEntry(context, 1, 101,
428 new MockRaftActorContext.MockPayload(""));
430 entries = Arrays.asList(
431 newReplicatedLogEntry(2, 101, "foo"));
433 // The new commitIndex is 101
434 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
435 follower.handleMessage(leaderActor, appendEntries);
437 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
439 assertTrue(syncStatus.isInitialSyncDone());
441 // Clear all the messages
442 followerActor.underlyingActor().clear();
444 context.setLastApplied(100);
445 setLastLogEntry(context, 1, 100,
446 new MockRaftActorContext.MockPayload(""));
448 entries = Arrays.asList(
449 newReplicatedLogEntry(2, 101, "foo"));
451 // leader-2 is becoming the leader now and it says the commitIndex is 45
452 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
453 follower.handleMessage(leaderActor, appendEntries);
455 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
457 // We get a new message saying initial status is not done
458 assertFalse(syncStatus.isInitialSyncDone());
464 * This test verifies that when an AppendEntries RPC is received by a RaftActor
465 * with a commitIndex that is greater than what has been applied to the
466 * state machine of the RaftActor, the RaftActor applies the state and
467 * sets it current applied state to the commitIndex of the sender.
472 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
473 logStart("testHandleAppendEntriesWithNewerCommitIndex");
475 MockRaftActorContext context = createActorContext();
477 context.setLastApplied(100);
478 setLastLogEntry(context, 1, 100,
479 new MockRaftActorContext.MockPayload(""));
480 context.getReplicatedLog().setSnapshotIndex(99);
482 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
483 newReplicatedLogEntry(2, 101, "foo"));
485 // The new commitIndex is 101
486 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
488 follower = createBehavior(context);
489 follower.handleMessage(leaderActor, appendEntries);
491 assertEquals("getLastApplied", 101L, context.getLastApplied());
495 * This test verifies that when an AppendEntries is received a specific prevLogTerm
496 * which does not match the term that is in RaftActors log entry at prevLogIndex
497 * then the RaftActor does not change it's state and it returns a failure.
502 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
503 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
505 MockRaftActorContext context = createActorContext();
507 // First set the receivers term to lower number
508 context.getTermInformation().update(95, "test");
510 // AppendEntries is now sent with a bigger term
511 // this will set the receivers term to be the same as the sender's term
512 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
514 follower = createBehavior(context);
516 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
518 Assert.assertSame(follower, newBehavior);
520 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
521 AppendEntriesReply.class);
523 assertEquals("isSuccess", false, reply.isSuccess());
527 * This test verifies that when a new AppendEntries message is received with
528 * new entries and the logs of the sender and receiver match that the new
529 * entries get added to the log and the log is incremented by the number of
530 * entries received in appendEntries
535 public void testHandleAppendEntriesAddNewEntries() {
536 logStart("testHandleAppendEntriesAddNewEntries");
538 MockRaftActorContext context = createActorContext();
540 // First set the receivers term to lower number
541 context.getTermInformation().update(1, "test");
543 // Prepare the receivers log
544 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
545 log.append(newReplicatedLogEntry(1, 0, "zero"));
546 log.append(newReplicatedLogEntry(1, 1, "one"));
547 log.append(newReplicatedLogEntry(1, 2, "two"));
549 context.setReplicatedLog(log);
551 // Prepare the entries to be sent with AppendEntries
552 List<ReplicatedLogEntry> entries = new ArrayList<>();
553 entries.add(newReplicatedLogEntry(1, 3, "three"));
554 entries.add(newReplicatedLogEntry(1, 4, "four"));
556 // Send appendEntries with the same term as was set on the receiver
557 // before the new behavior was created (1 in this case)
558 // This will not work for a Candidate because as soon as a Candidate
559 // is created it increments the term
560 short leaderPayloadVersion = 10;
561 String leaderId = "leader-1";
562 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
564 follower = createBehavior(context);
566 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
568 Assert.assertSame(follower, newBehavior);
570 assertEquals("Next index", 5, log.last().getIndex() + 1);
571 assertEquals("Entry 3", entries.get(0), log.get(3));
572 assertEquals("Entry 4", entries.get(1), log.get(4));
574 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
575 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
577 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
581 * This test verifies that when a new AppendEntries message is received with
582 * new entries and the logs of the sender and receiver are out-of-sync that
583 * the log is first corrected by removing the out of sync entries from the
584 * log and then adding in the new entries sent with the AppendEntries message
587 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
588 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
590 MockRaftActorContext context = createActorContext();
592 // First set the receivers term to lower number
593 context.getTermInformation().update(1, "test");
595 // Prepare the receivers log
596 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
597 log.append(newReplicatedLogEntry(1, 0, "zero"));
598 log.append(newReplicatedLogEntry(1, 1, "one"));
599 log.append(newReplicatedLogEntry(1, 2, "two"));
601 context.setReplicatedLog(log);
603 // Prepare the entries to be sent with AppendEntries
604 List<ReplicatedLogEntry> entries = new ArrayList<>();
605 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
606 entries.add(newReplicatedLogEntry(2, 3, "three"));
608 // Send appendEntries with the same term as was set on the receiver
609 // before the new behavior was created (1 in this case)
610 // This will not work for a Candidate because as soon as a Candidate
611 // is created it increments the term
612 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
614 follower = createBehavior(context);
616 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
618 Assert.assertSame(follower, newBehavior);
620 // The entry at index 2 will be found out-of-sync with the leader
621 // and will be removed
622 // Then the two new entries will be added to the log
623 // Thus making the log to have 4 entries
624 assertEquals("Next index", 4, log.last().getIndex() + 1);
625 //assertEquals("Entry 2", entries.get(0), log.get(2));
627 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
629 // Check that the entry at index 2 has the new data
630 assertEquals("Entry 2", entries.get(0), log.get(2));
632 assertEquals("Entry 3", entries.get(1), log.get(3));
634 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
638 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
639 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
641 MockRaftActorContext context = createActorContext();
643 // First set the receivers term to lower number
644 context.getTermInformation().update(1, "test");
646 // Prepare the receivers log
647 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
648 log.append(newReplicatedLogEntry(1, 0, "zero"));
649 log.append(newReplicatedLogEntry(1, 1, "one"));
650 log.append(newReplicatedLogEntry(1, 2, "two"));
652 context.setReplicatedLog(log);
654 // Prepare the entries to be sent with AppendEntries
655 List<ReplicatedLogEntry> entries = new ArrayList<>();
656 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
657 entries.add(newReplicatedLogEntry(2, 3, "three"));
659 // Send appendEntries with the same term as was set on the receiver
660 // before the new behavior was created (1 in this case)
661 // This will not work for a Candidate because as soon as a Candidate
662 // is created it increments the term
663 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
665 context.setRaftPolicy(createRaftPolicy(false, true));
666 follower = createBehavior(context);
668 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
670 Assert.assertSame(follower, newBehavior);
672 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
676 public void testHandleAppendEntriesPreviousLogEntryMissing(){
677 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
679 MockRaftActorContext context = createActorContext();
681 // Prepare the receivers log
682 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
683 log.append(newReplicatedLogEntry(1, 0, "zero"));
684 log.append(newReplicatedLogEntry(1, 1, "one"));
685 log.append(newReplicatedLogEntry(1, 2, "two"));
687 context.setReplicatedLog(log);
689 // Prepare the entries to be sent with AppendEntries
690 List<ReplicatedLogEntry> entries = new ArrayList<>();
691 entries.add(newReplicatedLogEntry(1, 4, "four"));
693 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
695 follower = createBehavior(context);
697 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
699 Assert.assertSame(follower, newBehavior);
701 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
705 public void testHandleAppendEntriesWithExistingLogEntry() {
706 logStart("testHandleAppendEntriesWithExistingLogEntry");
708 MockRaftActorContext context = createActorContext();
710 context.getTermInformation().update(1, "test");
712 // Prepare the receivers log
713 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
714 log.append(newReplicatedLogEntry(1, 0, "zero"));
715 log.append(newReplicatedLogEntry(1, 1, "one"));
717 context.setReplicatedLog(log);
719 // Send the last entry again.
720 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
722 follower = createBehavior(context);
724 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
726 assertEquals("Next index", 2, log.last().getIndex() + 1);
727 assertEquals("Entry 1", entries.get(0), log.get(1));
729 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
731 // Send the last entry again and also a new one.
733 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
735 leaderActor.underlyingActor().clear();
736 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
738 assertEquals("Next index", 3, log.last().getIndex() + 1);
739 assertEquals("Entry 1", entries.get(0), log.get(1));
740 assertEquals("Entry 2", entries.get(1), log.get(2));
742 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
746 public void testHandleAppendEntriesAfterInstallingSnapshot(){
747 logStart("testHandleAppendAfterInstallingSnapshot");
749 MockRaftActorContext context = createActorContext();
751 // Prepare the receivers log
752 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
754 // Set up a log as if it has been snapshotted
755 log.setSnapshotIndex(3);
756 log.setSnapshotTerm(1);
758 context.setReplicatedLog(log);
760 // Prepare the entries to be sent with AppendEntries
761 List<ReplicatedLogEntry> entries = new ArrayList<>();
762 entries.add(newReplicatedLogEntry(1, 4, "four"));
764 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
766 follower = createBehavior(context);
768 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
770 Assert.assertSame(follower, newBehavior);
772 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
777 * This test verifies that when InstallSnapshot is received by
778 * the follower its applied correctly.
783 public void testHandleInstallSnapshot() throws Exception {
784 logStart("testHandleInstallSnapshot");
786 MockRaftActorContext context = createActorContext();
787 context.getTermInformation().update(1, "leader");
789 follower = createBehavior(context);
791 ByteString bsSnapshot = createSnapshot();
793 int snapshotLength = bsSnapshot.size();
795 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
796 int lastIncludedIndex = 1;
798 InstallSnapshot lastInstallSnapshot = null;
800 for(int i = 0; i < totalChunks; i++) {
801 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
802 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
803 chunkData, chunkIndex, totalChunks);
804 follower.handleMessage(leaderActor, lastInstallSnapshot);
805 offset = offset + 50;
810 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
811 ApplySnapshot.class);
812 Snapshot snapshot = applySnapshot.getSnapshot();
813 assertNotNull(lastInstallSnapshot);
814 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
815 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
816 snapshot.getLastAppliedTerm());
817 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
818 snapshot.getLastAppliedIndex());
819 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
820 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
821 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
822 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
823 applySnapshot.getCallback().onSuccess();
825 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
826 leaderActor, InstallSnapshotReply.class);
827 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
830 for(InstallSnapshotReply reply: replies) {
831 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
832 assertEquals("getTerm", 1, reply.getTerm());
833 assertEquals("isSuccess", true, reply.isSuccess());
834 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
837 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
842 * Verify that when an AppendEntries is sent to a follower during a snapshot install
843 * the Follower short-circuits the processing of the AppendEntries message.
848 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
849 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
851 MockRaftActorContext context = createActorContext();
853 follower = createBehavior(context);
855 ByteString bsSnapshot = createSnapshot();
856 int snapshotLength = bsSnapshot.size();
858 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
859 int lastIncludedIndex = 1;
861 // Check that snapshot installation is not in progress
862 assertNull(follower.getSnapshotTracker());
864 // Make sure that we have more than 1 chunk to send
865 assertTrue(totalChunks > 1);
867 // Send an install snapshot with the first chunk to start the process of installing a snapshot
868 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
869 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
870 chunkData, 1, totalChunks));
872 // Check if snapshot installation is in progress now
873 assertNotNull(follower.getSnapshotTracker());
875 // Send an append entry
876 AppendEntries appendEntries = mock(AppendEntries.class);
877 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
879 follower.handleMessage(leaderActor, appendEntries);
881 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
882 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
883 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
884 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
886 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
887 verify(appendEntries, never()).getPrevLogIndex();
892 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
893 logStart("testInitialSyncUpWithHandleInstallSnapshot");
895 MockRaftActorContext context = createActorContext();
896 context.setCommitIndex(-1);
898 follower = createBehavior(context);
900 ByteString bsSnapshot = createSnapshot();
902 int snapshotLength = bsSnapshot.size();
904 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
905 int lastIncludedIndex = 1;
907 InstallSnapshot lastInstallSnapshot = null;
909 for(int i = 0; i < totalChunks; i++) {
910 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
911 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
912 chunkData, chunkIndex, totalChunks);
913 follower.handleMessage(leaderActor, lastInstallSnapshot);
914 offset = offset + 50;
919 FollowerInitialSyncUpStatus syncStatus =
920 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
922 assertFalse(syncStatus.isInitialSyncDone());
924 // Clear all the messages
925 followerActor.underlyingActor().clear();
927 context.setLastApplied(101);
928 context.setCommitIndex(101);
929 setLastLogEntry(context, 1, 101,
930 new MockRaftActorContext.MockPayload(""));
932 List<ReplicatedLogEntry> entries = Arrays.asList(
933 newReplicatedLogEntry(2, 101, "foo"));
935 // The new commitIndex is 101
936 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
937 follower.handleMessage(leaderActor, appendEntries);
939 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
941 assertTrue(syncStatus.isInitialSyncDone());
945 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
946 logStart("testHandleOutOfSequenceInstallSnapshot");
948 MockRaftActorContext context = createActorContext();
950 follower = createBehavior(context);
952 ByteString bsSnapshot = createSnapshot();
954 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
955 getNextChunk(bsSnapshot, 10, 50), 3, 3);
956 follower.handleMessage(leaderActor, installSnapshot);
958 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
959 InstallSnapshotReply.class);
961 assertEquals("isSuccess", false, reply.isSuccess());
962 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
963 assertEquals("getTerm", 1, reply.getTerm());
964 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
966 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
970 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
971 MockRaftActorContext context = createActorContext();
973 Stopwatch stopwatch = Stopwatch.createStarted();
975 follower = createBehavior(context);
977 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
979 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
981 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
983 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
984 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
988 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){
989 MockRaftActorContext context = createActorContext();
990 context.setConfigParams(new DefaultConfigParamsImpl(){
992 public FiniteDuration getElectionTimeOutInterval() {
993 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
997 context.setRaftPolicy(createRaftPolicy(false, false));
999 follower = createBehavior(context);
1001 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1002 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1003 assertSame("handleMessage result", follower, newBehavior);
1007 public void testFollowerSchedulesElectionIfNonVoting(){
1008 MockRaftActorContext context = createActorContext();
1009 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1010 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1011 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1012 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1014 follower = new Follower(context, "leader", (short)1);
1016 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1017 ElectionTimeout.class);
1018 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1019 assertSame("handleMessage result", follower, newBehavior);
1020 assertNull("Expected null leaderId", follower.getLeaderId());
1024 public void testElectionScheduledWhenAnyRaftRPCReceived(){
1025 MockRaftActorContext context = createActorContext();
1026 follower = createBehavior(context);
1027 follower.handleMessage(leaderActor, new RaftRPC() {
1028 private static final long serialVersionUID = 1L;
1031 public long getTerm() {
1035 verify(follower).scheduleElection(any(FiniteDuration.class));
1039 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
1040 MockRaftActorContext context = createActorContext();
1041 follower = createBehavior(context);
1042 follower.handleMessage(leaderActor, "non-raft-rpc");
1043 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1046 public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
1047 int snapshotLength = bs.size();
1049 int size = chunkSize;
1050 if (chunkSize > snapshotLength) {
1051 size = snapshotLength;
1053 if ((start + chunkSize) > snapshotLength) {
1054 size = snapshotLength - start;
1058 byte[] nextChunk = new byte[size];
1059 bs.copyTo(nextChunk, start, 0, size);
1063 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1064 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1065 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1068 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1069 String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1070 boolean expForceInstallSnapshot) {
1072 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1073 AppendEntriesReply.class);
1075 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1076 assertEquals("getTerm", expTerm, reply.getTerm());
1077 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1078 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1079 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1080 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1081 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1085 private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1086 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1087 new MockRaftActorContext.MockPayload(data));
1090 private ByteString createSnapshot(){
1091 HashMap<String, String> followerSnapshot = new HashMap<>();
1092 followerSnapshot.put("1", "A");
1093 followerSnapshot.put("2", "B");
1094 followerSnapshot.put("3", "C");
1096 return toByteString(followerSnapshot);
1100 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1101 ActorRef actorRef, RaftRPC rpc) throws Exception {
1102 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1104 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1105 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1109 protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1111 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1112 assertEquals("isSuccess", true, reply.isSuccess());