2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.Props;
23 import akka.testkit.TestActorRef;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.google.protobuf.ByteString;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Assert;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.Snapshot;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
42 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
43 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
44 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
45 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
47 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
49 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
50 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
51 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
52 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
53 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
54 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
55 import scala.concurrent.duration.FiniteDuration;
57 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
59 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
60 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
62 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
63 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
65 private Follower follower;
67 private final short payloadVersion = 5;
71 public void tearDown() throws Exception {
72 if(follower != null) {
80 protected Follower createBehavior(RaftActorContext actorContext) {
81 return spy(new Follower(actorContext));
85 protected MockRaftActorContext createActorContext() {
86 return createActorContext(followerActor);
90 protected MockRaftActorContext createActorContext(ActorRef actorRef){
91 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
92 context.setPayloadVersion(payloadVersion );
97 public void testThatAnElectionTimeoutIsTriggered(){
98 MockRaftActorContext actorContext = createActorContext();
99 follower = new Follower(actorContext);
101 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
102 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
106 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
107 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
109 MockRaftActorContext context = createActorContext();
110 follower = new Follower(context);
112 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
113 TimeUnit.MILLISECONDS);
114 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
116 assertTrue(raftBehavior instanceof Candidate);
120 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
121 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
123 MockRaftActorContext context = createActorContext();
124 ((DefaultConfigParamsImpl) context.getConfigParams()).
125 setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
126 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
128 follower = new Follower(context);
129 context.setCurrentBehavior(follower);
131 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
132 getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
133 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
136 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
137 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
138 assertTrue(raftBehavior instanceof Follower);
140 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
141 getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
142 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
145 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
146 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
147 assertTrue(raftBehavior instanceof Follower);
151 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
152 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
154 MockRaftActorContext context = createActorContext();
156 context.getTermInformation().update(term, null);
158 follower = createBehavior(context);
160 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
162 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
164 assertEquals("isVoteGranted", true, reply.isVoteGranted());
165 assertEquals("getTerm", term, reply.getTerm());
166 verify(follower).scheduleElection(any(FiniteDuration.class));
170 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
171 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
173 MockRaftActorContext context = createActorContext();
175 context.getTermInformation().update(term, "test");
177 follower = createBehavior(context);
179 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
181 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
183 assertEquals("isVoteGranted", false, reply.isVoteGranted());
184 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
189 public void testHandleFirstAppendEntries() throws Exception {
190 logStart("testHandleFirstAppendEntries");
192 MockRaftActorContext context = createActorContext();
193 context.getReplicatedLog().clear(0,2);
194 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
195 context.getReplicatedLog().setSnapshotIndex(99);
197 List<ReplicatedLogEntry> entries = Arrays.asList(
198 newReplicatedLogEntry(2, 101, "foo"));
200 Assert.assertEquals(1, context.getReplicatedLog().size());
202 // The new commitIndex is 101
203 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
205 follower = createBehavior(context);
206 follower.handleMessage(leaderActor, appendEntries);
208 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
209 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
211 assertFalse(syncStatus.isInitialSyncDone());
212 assertTrue("append entries reply should be true", reply.isSuccess());
216 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
217 logStart("testHandleFirstAppendEntries");
219 MockRaftActorContext context = createActorContext();
221 List<ReplicatedLogEntry> entries = Arrays.asList(
222 newReplicatedLogEntry(2, 101, "foo"));
224 // The new commitIndex is 101
225 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
227 follower = createBehavior(context);
228 follower.handleMessage(leaderActor, appendEntries);
230 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
231 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
233 assertFalse(syncStatus.isInitialSyncDone());
234 assertFalse("append entries reply should be false", reply.isSuccess());
238 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
239 logStart("testHandleFirstAppendEntries");
241 MockRaftActorContext context = createActorContext();
242 context.getReplicatedLog().clear(0,2);
243 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
244 context.getReplicatedLog().setSnapshotIndex(99);
246 List<ReplicatedLogEntry> entries = Arrays.asList(
247 newReplicatedLogEntry(2, 101, "foo"));
249 // The new commitIndex is 101
250 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
252 follower = createBehavior(context);
253 follower.handleMessage(leaderActor, appendEntries);
255 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
256 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
258 assertFalse(syncStatus.isInitialSyncDone());
259 assertTrue("append entries reply should be true", reply.isSuccess());
263 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
264 logStart("testHandleFirstAppendEntries");
266 MockRaftActorContext context = createActorContext();
267 context.getReplicatedLog().clear(0,2);
268 context.getReplicatedLog().setSnapshotIndex(100);
270 List<ReplicatedLogEntry> entries = Arrays.asList(
271 newReplicatedLogEntry(2, 101, "foo"));
273 // The new commitIndex is 101
274 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
276 follower = createBehavior(context);
277 follower.handleMessage(leaderActor, appendEntries);
279 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
280 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
282 assertFalse(syncStatus.isInitialSyncDone());
283 assertTrue("append entries reply should be true", reply.isSuccess());
287 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
288 logStart("testHandleFirstAppendEntries");
290 MockRaftActorContext context = createActorContext();
291 context.getReplicatedLog().clear(0,2);
292 context.getReplicatedLog().setSnapshotIndex(100);
294 List<ReplicatedLogEntry> entries = Arrays.asList(
295 newReplicatedLogEntry(2, 105, "foo"));
297 // The new commitIndex is 101
298 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
300 follower = createBehavior(context);
301 follower.handleMessage(leaderActor, appendEntries);
303 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
304 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
306 assertFalse(syncStatus.isInitialSyncDone());
307 assertFalse("append entries reply should be false", reply.isSuccess());
311 public void testHandleSyncUpAppendEntries() throws Exception {
312 logStart("testHandleSyncUpAppendEntries");
314 MockRaftActorContext context = createActorContext();
316 List<ReplicatedLogEntry> entries = Arrays.asList(
317 newReplicatedLogEntry(2, 101, "foo"));
319 // The new commitIndex is 101
320 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
322 follower = createBehavior(context);
323 follower.handleMessage(leaderActor, appendEntries);
325 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
327 assertFalse(syncStatus.isInitialSyncDone());
329 // Clear all the messages
330 followerActor.underlyingActor().clear();
332 context.setLastApplied(101);
333 context.setCommitIndex(101);
334 setLastLogEntry(context, 1, 101,
335 new MockRaftActorContext.MockPayload(""));
337 entries = Arrays.asList(
338 newReplicatedLogEntry(2, 101, "foo"));
340 // The new commitIndex is 101
341 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
342 follower.handleMessage(leaderActor, appendEntries);
344 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
346 assertTrue(syncStatus.isInitialSyncDone());
348 followerActor.underlyingActor().clear();
350 // Sending the same message again should not generate another message
352 follower.handleMessage(leaderActor, appendEntries);
354 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
356 assertNull(syncStatus);
361 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
362 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
364 MockRaftActorContext context = createActorContext();
366 List<ReplicatedLogEntry> entries = Arrays.asList(
367 newReplicatedLogEntry(2, 101, "foo"));
369 // The new commitIndex is 101
370 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
372 follower = createBehavior(context);
373 follower.handleMessage(leaderActor, appendEntries);
375 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
377 assertFalse(syncStatus.isInitialSyncDone());
379 // Clear all the messages
380 followerActor.underlyingActor().clear();
382 context.setLastApplied(100);
383 setLastLogEntry(context, 1, 100,
384 new MockRaftActorContext.MockPayload(""));
386 entries = Arrays.asList(
387 newReplicatedLogEntry(2, 101, "foo"));
389 // leader-2 is becoming the leader now and it says the commitIndex is 45
390 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
391 follower.handleMessage(leaderActor, appendEntries);
393 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
395 // We get a new message saying initial status is not done
396 assertFalse(syncStatus.isInitialSyncDone());
402 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
403 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
405 MockRaftActorContext context = createActorContext();
407 List<ReplicatedLogEntry> entries = Arrays.asList(
408 newReplicatedLogEntry(2, 101, "foo"));
410 // The new commitIndex is 101
411 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
413 follower = createBehavior(context);
414 follower.handleMessage(leaderActor, appendEntries);
416 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
418 assertFalse(syncStatus.isInitialSyncDone());
420 // Clear all the messages
421 followerActor.underlyingActor().clear();
423 context.setLastApplied(101);
424 context.setCommitIndex(101);
425 setLastLogEntry(context, 1, 101,
426 new MockRaftActorContext.MockPayload(""));
428 entries = Arrays.asList(
429 newReplicatedLogEntry(2, 101, "foo"));
431 // The new commitIndex is 101
432 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
433 follower.handleMessage(leaderActor, appendEntries);
435 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
437 assertTrue(syncStatus.isInitialSyncDone());
439 // Clear all the messages
440 followerActor.underlyingActor().clear();
442 context.setLastApplied(100);
443 setLastLogEntry(context, 1, 100,
444 new MockRaftActorContext.MockPayload(""));
446 entries = Arrays.asList(
447 newReplicatedLogEntry(2, 101, "foo"));
449 // leader-2 is becoming the leader now and it says the commitIndex is 45
450 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
451 follower.handleMessage(leaderActor, appendEntries);
453 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
455 // We get a new message saying initial status is not done
456 assertFalse(syncStatus.isInitialSyncDone());
462 * This test verifies that when an AppendEntries RPC is received by a RaftActor
463 * with a commitIndex that is greater than what has been applied to the
464 * state machine of the RaftActor, the RaftActor applies the state and
465 * sets it current applied state to the commitIndex of the sender.
470 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
471 logStart("testHandleAppendEntriesWithNewerCommitIndex");
473 MockRaftActorContext context = createActorContext();
475 context.setLastApplied(100);
476 setLastLogEntry(context, 1, 100,
477 new MockRaftActorContext.MockPayload(""));
478 context.getReplicatedLog().setSnapshotIndex(99);
480 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
481 newReplicatedLogEntry(2, 101, "foo"));
483 // The new commitIndex is 101
484 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
486 follower = createBehavior(context);
487 follower.handleMessage(leaderActor, appendEntries);
489 assertEquals("getLastApplied", 101L, context.getLastApplied());
493 * This test verifies that when an AppendEntries is received a specific prevLogTerm
494 * which does not match the term that is in RaftActors log entry at prevLogIndex
495 * then the RaftActor does not change it's state and it returns a failure.
500 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
501 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
503 MockRaftActorContext context = createActorContext();
505 // First set the receivers term to lower number
506 context.getTermInformation().update(95, "test");
508 // AppendEntries is now sent with a bigger term
509 // this will set the receivers term to be the same as the sender's term
510 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
512 follower = createBehavior(context);
514 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
516 Assert.assertSame(follower, newBehavior);
518 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
519 AppendEntriesReply.class);
521 assertEquals("isSuccess", false, reply.isSuccess());
525 * This test verifies that when a new AppendEntries message is received with
526 * new entries and the logs of the sender and receiver match that the new
527 * entries get added to the log and the log is incremented by the number of
528 * entries received in appendEntries
533 public void testHandleAppendEntriesAddNewEntries() {
534 logStart("testHandleAppendEntriesAddNewEntries");
536 MockRaftActorContext context = createActorContext();
538 // First set the receivers term to lower number
539 context.getTermInformation().update(1, "test");
541 // Prepare the receivers log
542 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
543 log.append(newReplicatedLogEntry(1, 0, "zero"));
544 log.append(newReplicatedLogEntry(1, 1, "one"));
545 log.append(newReplicatedLogEntry(1, 2, "two"));
547 context.setReplicatedLog(log);
549 // Prepare the entries to be sent with AppendEntries
550 List<ReplicatedLogEntry> entries = new ArrayList<>();
551 entries.add(newReplicatedLogEntry(1, 3, "three"));
552 entries.add(newReplicatedLogEntry(1, 4, "four"));
554 // Send appendEntries with the same term as was set on the receiver
555 // before the new behavior was created (1 in this case)
556 // This will not work for a Candidate because as soon as a Candidate
557 // is created it increments the term
558 short leaderPayloadVersion = 10;
559 String leaderId = "leader-1";
560 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
562 follower = createBehavior(context);
564 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
566 Assert.assertSame(follower, newBehavior);
568 assertEquals("Next index", 5, log.last().getIndex() + 1);
569 assertEquals("Entry 3", entries.get(0), log.get(3));
570 assertEquals("Entry 4", entries.get(1), log.get(4));
572 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
573 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
575 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
579 * This test verifies that when a new AppendEntries message is received with
580 * new entries and the logs of the sender and receiver are out-of-sync that
581 * the log is first corrected by removing the out of sync entries from the
582 * log and then adding in the new entries sent with the AppendEntries message
585 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
586 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
588 MockRaftActorContext context = createActorContext();
590 // First set the receivers term to lower number
591 context.getTermInformation().update(1, "test");
593 // Prepare the receivers log
594 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
595 log.append(newReplicatedLogEntry(1, 0, "zero"));
596 log.append(newReplicatedLogEntry(1, 1, "one"));
597 log.append(newReplicatedLogEntry(1, 2, "two"));
599 context.setReplicatedLog(log);
601 // Prepare the entries to be sent with AppendEntries
602 List<ReplicatedLogEntry> entries = new ArrayList<>();
603 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
604 entries.add(newReplicatedLogEntry(2, 3, "three"));
606 // Send appendEntries with the same term as was set on the receiver
607 // before the new behavior was created (1 in this case)
608 // This will not work for a Candidate because as soon as a Candidate
609 // is created it increments the term
610 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
612 follower = createBehavior(context);
614 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
616 Assert.assertSame(follower, newBehavior);
618 // The entry at index 2 will be found out-of-sync with the leader
619 // and will be removed
620 // Then the two new entries will be added to the log
621 // Thus making the log to have 4 entries
622 assertEquals("Next index", 4, log.last().getIndex() + 1);
623 //assertEquals("Entry 2", entries.get(0), log.get(2));
625 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
627 // Check that the entry at index 2 has the new data
628 assertEquals("Entry 2", entries.get(0), log.get(2));
630 assertEquals("Entry 3", entries.get(1), log.get(3));
632 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
636 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
637 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
639 MockRaftActorContext context = createActorContext();
641 // First set the receivers term to lower number
642 context.getTermInformation().update(1, "test");
644 // Prepare the receivers log
645 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
646 log.append(newReplicatedLogEntry(1, 0, "zero"));
647 log.append(newReplicatedLogEntry(1, 1, "one"));
648 log.append(newReplicatedLogEntry(1, 2, "two"));
650 context.setReplicatedLog(log);
652 // Prepare the entries to be sent with AppendEntries
653 List<ReplicatedLogEntry> entries = new ArrayList<>();
654 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
655 entries.add(newReplicatedLogEntry(2, 3, "three"));
657 // Send appendEntries with the same term as was set on the receiver
658 // before the new behavior was created (1 in this case)
659 // This will not work for a Candidate because as soon as a Candidate
660 // is created it increments the term
661 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
663 context.setRaftPolicy(createRaftPolicy(false, true));
664 follower = createBehavior(context);
666 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
668 Assert.assertSame(follower, newBehavior);
670 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
674 public void testHandleAppendEntriesPreviousLogEntryMissing(){
675 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
677 MockRaftActorContext context = createActorContext();
679 // Prepare the receivers log
680 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
681 log.append(newReplicatedLogEntry(1, 0, "zero"));
682 log.append(newReplicatedLogEntry(1, 1, "one"));
683 log.append(newReplicatedLogEntry(1, 2, "two"));
685 context.setReplicatedLog(log);
687 // Prepare the entries to be sent with AppendEntries
688 List<ReplicatedLogEntry> entries = new ArrayList<>();
689 entries.add(newReplicatedLogEntry(1, 4, "four"));
691 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
693 follower = createBehavior(context);
695 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
697 Assert.assertSame(follower, newBehavior);
699 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
703 public void testHandleAppendEntriesWithExistingLogEntry() {
704 logStart("testHandleAppendEntriesWithExistingLogEntry");
706 MockRaftActorContext context = createActorContext();
708 context.getTermInformation().update(1, "test");
710 // Prepare the receivers log
711 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
712 log.append(newReplicatedLogEntry(1, 0, "zero"));
713 log.append(newReplicatedLogEntry(1, 1, "one"));
715 context.setReplicatedLog(log);
717 // Send the last entry again.
718 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
720 follower = createBehavior(context);
722 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
724 assertEquals("Next index", 2, log.last().getIndex() + 1);
725 assertEquals("Entry 1", entries.get(0), log.get(1));
727 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
729 // Send the last entry again and also a new one.
731 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
733 leaderActor.underlyingActor().clear();
734 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
736 assertEquals("Next index", 3, log.last().getIndex() + 1);
737 assertEquals("Entry 1", entries.get(0), log.get(1));
738 assertEquals("Entry 2", entries.get(1), log.get(2));
740 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
744 public void testHandleAppendEntriesAfterInstallingSnapshot(){
745 logStart("testHandleAppendAfterInstallingSnapshot");
747 MockRaftActorContext context = createActorContext();
749 // Prepare the receivers log
750 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
752 // Set up a log as if it has been snapshotted
753 log.setSnapshotIndex(3);
754 log.setSnapshotTerm(1);
756 context.setReplicatedLog(log);
758 // Prepare the entries to be sent with AppendEntries
759 List<ReplicatedLogEntry> entries = new ArrayList<>();
760 entries.add(newReplicatedLogEntry(1, 4, "four"));
762 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
764 follower = createBehavior(context);
766 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
768 Assert.assertSame(follower, newBehavior);
770 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
775 * This test verifies that when InstallSnapshot is received by
776 * the follower its applied correctly.
781 public void testHandleInstallSnapshot() throws Exception {
782 logStart("testHandleInstallSnapshot");
784 MockRaftActorContext context = createActorContext();
785 context.getTermInformation().update(1, "leader");
787 follower = createBehavior(context);
789 ByteString bsSnapshot = createSnapshot();
791 int snapshotLength = bsSnapshot.size();
793 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
794 int lastIncludedIndex = 1;
796 InstallSnapshot lastInstallSnapshot = null;
798 for(int i = 0; i < totalChunks; i++) {
799 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
800 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
801 chunkData, chunkIndex, totalChunks);
802 follower.handleMessage(leaderActor, lastInstallSnapshot);
803 offset = offset + 50;
808 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
809 ApplySnapshot.class);
810 Snapshot snapshot = applySnapshot.getSnapshot();
811 assertNotNull(lastInstallSnapshot);
812 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
813 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
814 snapshot.getLastAppliedTerm());
815 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
816 snapshot.getLastAppliedIndex());
817 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
818 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
819 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
820 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
821 applySnapshot.getCallback().onSuccess();
823 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
824 leaderActor, InstallSnapshotReply.class);
825 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
828 for(InstallSnapshotReply reply: replies) {
829 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
830 assertEquals("getTerm", 1, reply.getTerm());
831 assertEquals("isSuccess", true, reply.isSuccess());
832 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
835 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
840 * Verify that when an AppendEntries is sent to a follower during a snapshot install
841 * the Follower short-circuits the processing of the AppendEntries message.
846 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
847 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
849 MockRaftActorContext context = createActorContext();
851 follower = createBehavior(context);
853 ByteString bsSnapshot = createSnapshot();
854 int snapshotLength = bsSnapshot.size();
856 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
857 int lastIncludedIndex = 1;
859 // Check that snapshot installation is not in progress
860 assertNull(follower.getSnapshotTracker());
862 // Make sure that we have more than 1 chunk to send
863 assertTrue(totalChunks > 1);
865 // Send an install snapshot with the first chunk to start the process of installing a snapshot
866 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
867 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
868 chunkData, 1, totalChunks));
870 // Check if snapshot installation is in progress now
871 assertNotNull(follower.getSnapshotTracker());
873 // Send an append entry
874 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
875 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
877 follower.handleMessage(leaderActor, appendEntries);
879 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
880 assertEquals("isSuccess", true, reply.isSuccess());
881 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
882 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
883 assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
885 assertNotNull(follower.getSnapshotTracker());
889 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
890 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
892 MockRaftActorContext context = createActorContext();
894 follower = createBehavior(context);
896 ByteString bsSnapshot = createSnapshot();
897 int snapshotLength = bsSnapshot.size();
899 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
900 int lastIncludedIndex = 1;
902 // Check that snapshot installation is not in progress
903 assertNull(follower.getSnapshotTracker());
905 // Make sure that we have more than 1 chunk to send
906 assertTrue(totalChunks > 1);
908 // Send an install snapshot with the first chunk to start the process of installing a snapshot
909 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
910 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
911 chunkData, 1, totalChunks));
913 // Check if snapshot installation is in progress now
914 assertNotNull(follower.getSnapshotTracker());
916 // Send appendEntries with a new term and leader.
917 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
918 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
920 follower.handleMessage(leaderActor, appendEntries);
922 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
923 assertEquals("isSuccess", true, reply.isSuccess());
924 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
925 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
926 assertEquals("getTerm", 2, reply.getTerm());
928 assertNull(follower.getSnapshotTracker());
932 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
933 logStart("testInitialSyncUpWithHandleInstallSnapshot");
935 MockRaftActorContext context = createActorContext();
936 context.setCommitIndex(-1);
938 follower = createBehavior(context);
940 ByteString bsSnapshot = createSnapshot();
942 int snapshotLength = bsSnapshot.size();
944 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
945 int lastIncludedIndex = 1;
947 InstallSnapshot lastInstallSnapshot = null;
949 for(int i = 0; i < totalChunks; i++) {
950 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
951 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
952 chunkData, chunkIndex, totalChunks);
953 follower.handleMessage(leaderActor, lastInstallSnapshot);
954 offset = offset + 50;
959 FollowerInitialSyncUpStatus syncStatus =
960 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
962 assertFalse(syncStatus.isInitialSyncDone());
964 // Clear all the messages
965 followerActor.underlyingActor().clear();
967 context.setLastApplied(101);
968 context.setCommitIndex(101);
969 setLastLogEntry(context, 1, 101,
970 new MockRaftActorContext.MockPayload(""));
972 List<ReplicatedLogEntry> entries = Arrays.asList(
973 newReplicatedLogEntry(2, 101, "foo"));
975 // The new commitIndex is 101
976 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
977 follower.handleMessage(leaderActor, appendEntries);
979 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
981 assertTrue(syncStatus.isInitialSyncDone());
985 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
986 logStart("testHandleOutOfSequenceInstallSnapshot");
988 MockRaftActorContext context = createActorContext();
990 follower = createBehavior(context);
992 ByteString bsSnapshot = createSnapshot();
994 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
995 getNextChunk(bsSnapshot, 10, 50), 3, 3);
996 follower.handleMessage(leaderActor, installSnapshot);
998 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
999 InstallSnapshotReply.class);
1001 assertEquals("isSuccess", false, reply.isSuccess());
1002 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1003 assertEquals("getTerm", 1, reply.getTerm());
1004 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1006 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1010 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
1011 MockRaftActorContext context = createActorContext();
1013 Stopwatch stopwatch = Stopwatch.createStarted();
1015 follower = createBehavior(context);
1017 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1019 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1021 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1023 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1024 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1028 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){
1029 MockRaftActorContext context = createActorContext();
1030 context.setConfigParams(new DefaultConfigParamsImpl(){
1032 public FiniteDuration getElectionTimeOutInterval() {
1033 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1037 context.setRaftPolicy(createRaftPolicy(false, false));
1039 follower = createBehavior(context);
1041 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1042 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1043 assertSame("handleMessage result", follower, newBehavior);
1047 public void testFollowerSchedulesElectionIfNonVoting(){
1048 MockRaftActorContext context = createActorContext();
1049 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1050 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1051 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1052 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1054 follower = new Follower(context, "leader", (short)1);
1056 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1057 ElectionTimeout.class);
1058 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1059 assertSame("handleMessage result", follower, newBehavior);
1060 assertNull("Expected null leaderId", follower.getLeaderId());
1064 public void testElectionScheduledWhenAnyRaftRPCReceived(){
1065 MockRaftActorContext context = createActorContext();
1066 follower = createBehavior(context);
1067 follower.handleMessage(leaderActor, new RaftRPC() {
1068 private static final long serialVersionUID = 1L;
1071 public long getTerm() {
1075 verify(follower).scheduleElection(any(FiniteDuration.class));
1079 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
1080 MockRaftActorContext context = createActorContext();
1081 follower = createBehavior(context);
1082 follower.handleMessage(leaderActor, "non-raft-rpc");
1083 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1086 public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
1087 int snapshotLength = bs.size();
1089 int size = chunkSize;
1090 if (chunkSize > snapshotLength) {
1091 size = snapshotLength;
1093 if (start + chunkSize > snapshotLength) {
1094 size = snapshotLength - start;
1098 byte[] nextChunk = new byte[size];
1099 bs.copyTo(nextChunk, start, 0, size);
1103 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1104 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1105 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1108 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1109 String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1110 boolean expForceInstallSnapshot) {
1112 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1113 AppendEntriesReply.class);
1115 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1116 assertEquals("getTerm", expTerm, reply.getTerm());
1117 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1118 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1119 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1120 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1121 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1125 private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1126 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1127 new MockRaftActorContext.MockPayload(data));
1130 private ByteString createSnapshot(){
1131 HashMap<String, String> followerSnapshot = new HashMap<>();
1132 followerSnapshot.put("1", "A");
1133 followerSnapshot.put("2", "B");
1134 followerSnapshot.put("3", "C");
1136 return toByteString(followerSnapshot);
1140 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1141 ActorRef actorRef, RaftRPC rpc) throws Exception {
1142 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1144 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1145 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1149 protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1151 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1152 assertEquals("isSuccess", true, reply.isSuccess());