2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.Props;
24 import akka.testkit.TestActorRef;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Assert;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
41 import org.opendaylight.controller.cluster.raft.Snapshot;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
44 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
45 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
47 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
48 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
49 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
50 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
51 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
52 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
53 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
54 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
55 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
56 import scala.concurrent.duration.FiniteDuration;
58 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
60 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
61 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
63 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
64 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
66 private Follower follower;
68 private final short payloadVersion = 5;
72 public void tearDown() throws Exception {
73 if (follower != null) {
81 protected Follower createBehavior(RaftActorContext actorContext) {
82 return spy(new Follower(actorContext));
86 protected MockRaftActorContext createActorContext() {
87 return createActorContext(followerActor);
91 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
92 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
93 context.setPayloadVersion(payloadVersion );
98 public void testThatAnElectionTimeoutIsTriggered() {
99 MockRaftActorContext actorContext = createActorContext();
100 follower = new Follower(actorContext);
102 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
103 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
107 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
108 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
110 MockRaftActorContext context = createActorContext();
111 follower = new Follower(context);
113 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
114 TimeUnit.MILLISECONDS);
115 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
117 assertTrue(raftBehavior instanceof Candidate);
121 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
122 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
124 MockRaftActorContext context = createActorContext();
125 ((DefaultConfigParamsImpl) context.getConfigParams())
126 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
127 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
129 follower = new Follower(context);
130 context.setCurrentBehavior(follower);
132 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
133 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
134 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
137 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
138 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
139 assertTrue(raftBehavior instanceof Follower);
141 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
142 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
143 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
146 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
147 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
148 assertTrue(raftBehavior instanceof Follower);
152 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
153 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
155 MockRaftActorContext context = createActorContext();
157 context.getTermInformation().update(term, null);
159 follower = createBehavior(context);
161 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
163 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
165 assertEquals("isVoteGranted", true, reply.isVoteGranted());
166 assertEquals("getTerm", term, reply.getTerm());
167 verify(follower).scheduleElection(any(FiniteDuration.class));
171 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
172 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
174 MockRaftActorContext context = createActorContext();
176 context.getTermInformation().update(term, "test");
178 follower = createBehavior(context);
180 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
182 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
184 assertEquals("isVoteGranted", false, reply.isVoteGranted());
185 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
190 public void testHandleFirstAppendEntries() throws Exception {
191 logStart("testHandleFirstAppendEntries");
193 MockRaftActorContext context = createActorContext();
194 context.getReplicatedLog().clear(0,2);
195 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
196 context.getReplicatedLog().setSnapshotIndex(99);
198 List<ReplicatedLogEntry> entries = Arrays.asList(
199 newReplicatedLogEntry(2, 101, "foo"));
201 Assert.assertEquals(1, context.getReplicatedLog().size());
203 // The new commitIndex is 101
204 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
206 follower = createBehavior(context);
207 follower.handleMessage(leaderActor, appendEntries);
209 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
210 FollowerInitialSyncUpStatus.class);
211 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
213 assertFalse(syncStatus.isInitialSyncDone());
214 assertTrue("append entries reply should be true", reply.isSuccess());
218 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
219 logStart("testHandleFirstAppendEntries");
221 MockRaftActorContext context = createActorContext();
223 List<ReplicatedLogEntry> entries = Arrays.asList(
224 newReplicatedLogEntry(2, 101, "foo"));
226 // The new commitIndex is 101
227 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
229 follower = createBehavior(context);
230 follower.handleMessage(leaderActor, appendEntries);
232 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
233 FollowerInitialSyncUpStatus.class);
234 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
236 assertFalse(syncStatus.isInitialSyncDone());
237 assertFalse("append entries reply should be false", reply.isSuccess());
241 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
243 logStart("testHandleFirstAppendEntries");
245 MockRaftActorContext context = createActorContext();
246 context.getReplicatedLog().clear(0,2);
247 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
248 context.getReplicatedLog().setSnapshotIndex(99);
250 List<ReplicatedLogEntry> entries = Arrays.asList(
251 newReplicatedLogEntry(2, 101, "foo"));
253 // The new commitIndex is 101
254 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
256 follower = createBehavior(context);
257 follower.handleMessage(leaderActor, appendEntries);
259 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
260 FollowerInitialSyncUpStatus.class);
261 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
263 assertFalse(syncStatus.isInitialSyncDone());
264 assertTrue("append entries reply should be true", reply.isSuccess());
268 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
270 logStart("testHandleFirstAppendEntries");
272 MockRaftActorContext context = createActorContext();
273 context.getReplicatedLog().clear(0,2);
274 context.getReplicatedLog().setSnapshotIndex(100);
276 List<ReplicatedLogEntry> entries = Arrays.asList(
277 newReplicatedLogEntry(2, 101, "foo"));
279 // The new commitIndex is 101
280 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
282 follower = createBehavior(context);
283 follower.handleMessage(leaderActor, appendEntries);
285 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
286 FollowerInitialSyncUpStatus.class);
287 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
289 assertFalse(syncStatus.isInitialSyncDone());
290 assertTrue("append entries reply should be true", reply.isSuccess());
294 public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
297 "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
299 MockRaftActorContext context = createActorContext();
300 context.getReplicatedLog().clear(0,2);
301 context.getReplicatedLog().setSnapshotIndex(100);
303 List<ReplicatedLogEntry> entries = Arrays.asList(
304 newReplicatedLogEntry(2, 105, "foo"));
306 // The new commitIndex is 101
307 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
309 follower = createBehavior(context);
310 follower.handleMessage(leaderActor, appendEntries);
312 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
313 FollowerInitialSyncUpStatus.class);
314 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
316 assertFalse(syncStatus.isInitialSyncDone());
317 assertFalse("append entries reply should be false", reply.isSuccess());
321 public void testHandleSyncUpAppendEntries() throws Exception {
322 logStart("testHandleSyncUpAppendEntries");
324 MockRaftActorContext context = createActorContext();
326 List<ReplicatedLogEntry> entries = Arrays.asList(
327 newReplicatedLogEntry(2, 101, "foo"));
329 // The new commitIndex is 101
330 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
332 follower = createBehavior(context);
333 follower.handleMessage(leaderActor, appendEntries);
335 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
336 FollowerInitialSyncUpStatus.class);
338 assertFalse(syncStatus.isInitialSyncDone());
340 // Clear all the messages
341 followerActor.underlyingActor().clear();
343 context.setLastApplied(101);
344 context.setCommitIndex(101);
345 setLastLogEntry(context, 1, 101,
346 new MockRaftActorContext.MockPayload(""));
348 entries = Arrays.asList(
349 newReplicatedLogEntry(2, 101, "foo"));
351 // The new commitIndex is 101
352 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
353 follower.handleMessage(leaderActor, appendEntries);
355 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
357 assertTrue(syncStatus.isInitialSyncDone());
359 followerActor.underlyingActor().clear();
361 // Sending the same message again should not generate another message
363 follower.handleMessage(leaderActor, appendEntries);
365 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
367 assertNull(syncStatus);
372 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
373 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
375 MockRaftActorContext context = createActorContext();
377 List<ReplicatedLogEntry> entries = Arrays.asList(
378 newReplicatedLogEntry(2, 101, "foo"));
380 // The new commitIndex is 101
381 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
383 follower = createBehavior(context);
384 follower.handleMessage(leaderActor, appendEntries);
386 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
387 FollowerInitialSyncUpStatus.class);
389 assertFalse(syncStatus.isInitialSyncDone());
391 // Clear all the messages
392 followerActor.underlyingActor().clear();
394 context.setLastApplied(100);
395 setLastLogEntry(context, 1, 100,
396 new MockRaftActorContext.MockPayload(""));
398 entries = Arrays.asList(
399 newReplicatedLogEntry(2, 101, "foo"));
401 // leader-2 is becoming the leader now and it says the commitIndex is 45
402 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
403 follower.handleMessage(leaderActor, appendEntries);
405 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
407 // We get a new message saying initial status is not done
408 assertFalse(syncStatus.isInitialSyncDone());
414 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
415 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
417 MockRaftActorContext context = createActorContext();
419 List<ReplicatedLogEntry> entries = Arrays.asList(
420 newReplicatedLogEntry(2, 101, "foo"));
422 // The new commitIndex is 101
423 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
425 follower = createBehavior(context);
426 follower.handleMessage(leaderActor, appendEntries);
428 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
429 FollowerInitialSyncUpStatus.class);
431 assertFalse(syncStatus.isInitialSyncDone());
433 // Clear all the messages
434 followerActor.underlyingActor().clear();
436 context.setLastApplied(101);
437 context.setCommitIndex(101);
438 setLastLogEntry(context, 1, 101,
439 new MockRaftActorContext.MockPayload(""));
441 entries = Arrays.asList(
442 newReplicatedLogEntry(2, 101, "foo"));
444 // The new commitIndex is 101
445 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
446 follower.handleMessage(leaderActor, appendEntries);
448 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
450 assertTrue(syncStatus.isInitialSyncDone());
452 // Clear all the messages
453 followerActor.underlyingActor().clear();
455 context.setLastApplied(100);
456 setLastLogEntry(context, 1, 100,
457 new MockRaftActorContext.MockPayload(""));
459 entries = Arrays.asList(
460 newReplicatedLogEntry(2, 101, "foo"));
462 // leader-2 is becoming the leader now and it says the commitIndex is 45
463 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
464 follower.handleMessage(leaderActor, appendEntries);
466 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
468 // We get a new message saying initial status is not done
469 assertFalse(syncStatus.isInitialSyncDone());
475 * This test verifies that when an AppendEntries RPC is received by a RaftActor
476 * with a commitIndex that is greater than what has been applied to the
477 * state machine of the RaftActor, the RaftActor applies the state and
478 * sets it current applied state to the commitIndex of the sender.
481 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
482 logStart("testHandleAppendEntriesWithNewerCommitIndex");
484 MockRaftActorContext context = createActorContext();
486 context.setLastApplied(100);
487 setLastLogEntry(context, 1, 100,
488 new MockRaftActorContext.MockPayload(""));
489 context.getReplicatedLog().setSnapshotIndex(99);
491 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
492 newReplicatedLogEntry(2, 101, "foo"));
494 // The new commitIndex is 101
495 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
497 follower = createBehavior(context);
498 follower.handleMessage(leaderActor, appendEntries);
500 assertEquals("getLastApplied", 101L, context.getLastApplied());
504 * This test verifies that when an AppendEntries is received a specific prevLogTerm
505 * which does not match the term that is in RaftActors log entry at prevLogIndex
506 * then the RaftActor does not change it's state and it returns a failure.
509 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
510 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
512 MockRaftActorContext context = createActorContext();
514 // First set the receivers term to lower number
515 context.getTermInformation().update(95, "test");
517 // AppendEntries is now sent with a bigger term
518 // this will set the receivers term to be the same as the sender's term
519 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
521 follower = createBehavior(context);
523 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
525 Assert.assertSame(follower, newBehavior);
527 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
528 AppendEntriesReply.class);
530 assertEquals("isSuccess", false, reply.isSuccess());
534 * This test verifies that when a new AppendEntries message is received with
535 * new entries and the logs of the sender and receiver match that the new
536 * entries get added to the log and the log is incremented by the number of
537 * entries received in appendEntries.
540 public void testHandleAppendEntriesAddNewEntries() {
541 logStart("testHandleAppendEntriesAddNewEntries");
543 MockRaftActorContext context = createActorContext();
545 // First set the receivers term to lower number
546 context.getTermInformation().update(1, "test");
548 // Prepare the receivers log
549 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
550 log.append(newReplicatedLogEntry(1, 0, "zero"));
551 log.append(newReplicatedLogEntry(1, 1, "one"));
552 log.append(newReplicatedLogEntry(1, 2, "two"));
554 context.setReplicatedLog(log);
556 // Prepare the entries to be sent with AppendEntries
557 List<ReplicatedLogEntry> entries = new ArrayList<>();
558 entries.add(newReplicatedLogEntry(1, 3, "three"));
559 entries.add(newReplicatedLogEntry(1, 4, "four"));
561 // Send appendEntries with the same term as was set on the receiver
562 // before the new behavior was created (1 in this case)
563 // This will not work for a Candidate because as soon as a Candidate
564 // is created it increments the term
565 short leaderPayloadVersion = 10;
566 String leaderId = "leader-1";
567 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
569 follower = createBehavior(context);
571 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
573 Assert.assertSame(follower, newBehavior);
575 assertEquals("Next index", 5, log.last().getIndex() + 1);
576 assertEquals("Entry 3", entries.get(0), log.get(3));
577 assertEquals("Entry 4", entries.get(1), log.get(4));
579 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
580 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
582 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
586 * This test verifies that when a new AppendEntries message is received with
587 * new entries and the logs of the sender and receiver are out-of-sync that
588 * the log is first corrected by removing the out of sync entries from the
589 * log and then adding in the new entries sent with the AppendEntries message.
592 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
593 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
595 MockRaftActorContext context = createActorContext();
597 // First set the receivers term to lower number
598 context.getTermInformation().update(1, "test");
600 // Prepare the receivers log
601 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
602 log.append(newReplicatedLogEntry(1, 0, "zero"));
603 log.append(newReplicatedLogEntry(1, 1, "one"));
604 log.append(newReplicatedLogEntry(1, 2, "two"));
606 context.setReplicatedLog(log);
608 // Prepare the entries to be sent with AppendEntries
609 List<ReplicatedLogEntry> entries = new ArrayList<>();
610 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
611 entries.add(newReplicatedLogEntry(2, 3, "three"));
613 // Send appendEntries with the same term as was set on the receiver
614 // before the new behavior was created (1 in this case)
615 // This will not work for a Candidate because as soon as a Candidate
616 // is created it increments the term
617 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
619 follower = createBehavior(context);
621 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
623 Assert.assertSame(follower, newBehavior);
625 // The entry at index 2 will be found out-of-sync with the leader
626 // and will be removed
627 // Then the two new entries will be added to the log
628 // Thus making the log to have 4 entries
629 assertEquals("Next index", 4, log.last().getIndex() + 1);
630 //assertEquals("Entry 2", entries.get(0), log.get(2));
632 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
634 // Check that the entry at index 2 has the new data
635 assertEquals("Entry 2", entries.get(0), log.get(2));
637 assertEquals("Entry 3", entries.get(1), log.get(3));
639 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
643 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
644 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
646 MockRaftActorContext context = createActorContext();
648 // First set the receivers term to lower number
649 context.getTermInformation().update(1, "test");
651 // Prepare the receivers log
652 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
653 log.append(newReplicatedLogEntry(1, 0, "zero"));
654 log.append(newReplicatedLogEntry(1, 1, "one"));
655 log.append(newReplicatedLogEntry(1, 2, "two"));
657 context.setReplicatedLog(log);
659 // Prepare the entries to be sent with AppendEntries
660 List<ReplicatedLogEntry> entries = new ArrayList<>();
661 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
662 entries.add(newReplicatedLogEntry(2, 3, "three"));
664 // Send appendEntries with the same term as was set on the receiver
665 // before the new behavior was created (1 in this case)
666 // This will not work for a Candidate because as soon as a Candidate
667 // is created it increments the term
668 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
670 context.setRaftPolicy(createRaftPolicy(false, true));
671 follower = createBehavior(context);
673 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
675 Assert.assertSame(follower, newBehavior);
677 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
681 public void testHandleAppendEntriesPreviousLogEntryMissing() {
682 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
684 final MockRaftActorContext context = createActorContext();
686 // Prepare the receivers log
687 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
688 log.append(newReplicatedLogEntry(1, 0, "zero"));
689 log.append(newReplicatedLogEntry(1, 1, "one"));
690 log.append(newReplicatedLogEntry(1, 2, "two"));
692 context.setReplicatedLog(log);
694 // Prepare the entries to be sent with AppendEntries
695 List<ReplicatedLogEntry> entries = new ArrayList<>();
696 entries.add(newReplicatedLogEntry(1, 4, "four"));
698 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
700 follower = createBehavior(context);
702 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
704 Assert.assertSame(follower, newBehavior);
706 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
710 public void testHandleAppendEntriesWithExistingLogEntry() {
711 logStart("testHandleAppendEntriesWithExistingLogEntry");
713 MockRaftActorContext context = createActorContext();
715 context.getTermInformation().update(1, "test");
717 // Prepare the receivers log
718 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
719 log.append(newReplicatedLogEntry(1, 0, "zero"));
720 log.append(newReplicatedLogEntry(1, 1, "one"));
722 context.setReplicatedLog(log);
724 // Send the last entry again.
725 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
727 follower = createBehavior(context);
729 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
731 assertEquals("Next index", 2, log.last().getIndex() + 1);
732 assertEquals("Entry 1", entries.get(0), log.get(1));
734 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
736 // Send the last entry again and also a new one.
738 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
740 leaderActor.underlyingActor().clear();
741 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
743 assertEquals("Next index", 3, log.last().getIndex() + 1);
744 assertEquals("Entry 1", entries.get(0), log.get(1));
745 assertEquals("Entry 2", entries.get(1), log.get(2));
747 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
751 public void testHandleAppendEntriesAfterInstallingSnapshot() {
752 logStart("testHandleAppendAfterInstallingSnapshot");
754 MockRaftActorContext context = createActorContext();
756 // Prepare the receivers log
757 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
759 // Set up a log as if it has been snapshotted
760 log.setSnapshotIndex(3);
761 log.setSnapshotTerm(1);
763 context.setReplicatedLog(log);
765 // Prepare the entries to be sent with AppendEntries
766 List<ReplicatedLogEntry> entries = new ArrayList<>();
767 entries.add(newReplicatedLogEntry(1, 4, "four"));
769 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
771 follower = createBehavior(context);
773 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
775 Assert.assertSame(follower, newBehavior);
777 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
782 * This test verifies that when InstallSnapshot is received by
783 * the follower its applied correctly.
786 public void testHandleInstallSnapshot() throws Exception {
787 logStart("testHandleInstallSnapshot");
789 MockRaftActorContext context = createActorContext();
790 context.getTermInformation().update(1, "leader");
792 follower = createBehavior(context);
794 ByteString bsSnapshot = createSnapshot();
796 int snapshotLength = bsSnapshot.size();
798 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
799 int lastIncludedIndex = 1;
801 InstallSnapshot lastInstallSnapshot = null;
803 for (int i = 0; i < totalChunks; i++) {
804 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
805 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
806 chunkData, chunkIndex, totalChunks);
807 follower.handleMessage(leaderActor, lastInstallSnapshot);
808 offset = offset + 50;
813 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
814 ApplySnapshot.class);
815 Snapshot snapshot = applySnapshot.getSnapshot();
816 assertNotNull(lastInstallSnapshot);
817 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
818 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
819 snapshot.getLastAppliedTerm());
820 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
821 snapshot.getLastAppliedIndex());
822 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
823 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
824 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
825 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
826 applySnapshot.getCallback().onSuccess();
828 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
829 leaderActor, InstallSnapshotReply.class);
830 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
833 for (InstallSnapshotReply reply: replies) {
834 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
835 assertEquals("getTerm", 1, reply.getTerm());
836 assertEquals("isSuccess", true, reply.isSuccess());
837 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
840 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
845 * Verify that when an AppendEntries is sent to a follower during a snapshot install
846 * the Follower short-circuits the processing of the AppendEntries message.
849 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
850 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
852 MockRaftActorContext context = createActorContext();
854 follower = createBehavior(context);
856 ByteString bsSnapshot = createSnapshot();
857 int snapshotLength = bsSnapshot.size();
859 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
860 int lastIncludedIndex = 1;
862 // Check that snapshot installation is not in progress
863 assertNull(follower.getSnapshotTracker());
865 // Make sure that we have more than 1 chunk to send
866 assertTrue(totalChunks > 1);
868 // Send an install snapshot with the first chunk to start the process of installing a snapshot
869 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
870 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
871 chunkData, 1, totalChunks));
873 // Check if snapshot installation is in progress now
874 assertNotNull(follower.getSnapshotTracker());
876 // Send an append entry
877 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
878 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
880 follower.handleMessage(leaderActor, appendEntries);
882 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
883 assertEquals("isSuccess", true, reply.isSuccess());
884 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
885 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
886 assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
888 assertNotNull(follower.getSnapshotTracker());
892 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
893 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
895 MockRaftActorContext context = createActorContext();
897 follower = createBehavior(context);
899 ByteString bsSnapshot = createSnapshot();
900 int snapshotLength = bsSnapshot.size();
902 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
903 int lastIncludedIndex = 1;
905 // Check that snapshot installation is not in progress
906 assertNull(follower.getSnapshotTracker());
908 // Make sure that we have more than 1 chunk to send
909 assertTrue(totalChunks > 1);
911 // Send an install snapshot with the first chunk to start the process of installing a snapshot
912 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
913 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
914 chunkData, 1, totalChunks));
916 // Check if snapshot installation is in progress now
917 assertNotNull(follower.getSnapshotTracker());
919 // Send appendEntries with a new term and leader.
920 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
921 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
923 follower.handleMessage(leaderActor, appendEntries);
925 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
926 assertEquals("isSuccess", true, reply.isSuccess());
927 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
928 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
929 assertEquals("getTerm", 2, reply.getTerm());
931 assertNull(follower.getSnapshotTracker());
935 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
936 logStart("testInitialSyncUpWithHandleInstallSnapshot");
938 MockRaftActorContext context = createActorContext();
939 context.setCommitIndex(-1);
941 follower = createBehavior(context);
943 ByteString bsSnapshot = createSnapshot();
945 int snapshotLength = bsSnapshot.size();
947 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
948 int lastIncludedIndex = 1;
950 InstallSnapshot lastInstallSnapshot = null;
952 for (int i = 0; i < totalChunks; i++) {
953 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
954 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
955 chunkData, chunkIndex, totalChunks);
956 follower.handleMessage(leaderActor, lastInstallSnapshot);
957 offset = offset + 50;
962 FollowerInitialSyncUpStatus syncStatus =
963 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
965 assertFalse(syncStatus.isInitialSyncDone());
967 // Clear all the messages
968 followerActor.underlyingActor().clear();
970 context.setLastApplied(101);
971 context.setCommitIndex(101);
972 setLastLogEntry(context, 1, 101,
973 new MockRaftActorContext.MockPayload(""));
975 List<ReplicatedLogEntry> entries = Arrays.asList(
976 newReplicatedLogEntry(2, 101, "foo"));
978 // The new commitIndex is 101
979 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
980 follower.handleMessage(leaderActor, appendEntries);
982 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
984 assertTrue(syncStatus.isInitialSyncDone());
988 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
989 logStart("testHandleOutOfSequenceInstallSnapshot");
991 MockRaftActorContext context = createActorContext();
993 follower = createBehavior(context);
995 ByteString bsSnapshot = createSnapshot();
997 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
998 getNextChunk(bsSnapshot, 10, 50), 3, 3);
999 follower.handleMessage(leaderActor, installSnapshot);
1001 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1002 InstallSnapshotReply.class);
1004 assertEquals("isSuccess", false, reply.isSuccess());
1005 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1006 assertEquals("getTerm", 1, reply.getTerm());
1007 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1009 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1013 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1014 MockRaftActorContext context = createActorContext();
1016 Stopwatch stopwatch = Stopwatch.createStarted();
1018 follower = createBehavior(context);
1020 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1022 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1024 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1026 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1027 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1031 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1032 MockRaftActorContext context = createActorContext();
1033 context.setConfigParams(new DefaultConfigParamsImpl() {
1035 public FiniteDuration getElectionTimeOutInterval() {
1036 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1040 context.setRaftPolicy(createRaftPolicy(false, false));
1042 follower = createBehavior(context);
1044 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1045 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1046 assertSame("handleMessage result", follower, newBehavior);
1050 public void testFollowerSchedulesElectionIfNonVoting() {
1051 MockRaftActorContext context = createActorContext();
1052 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1053 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1054 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1055 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1057 follower = new Follower(context, "leader", (short)1);
1059 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1060 ElectionTimeout.class);
1061 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1062 assertSame("handleMessage result", follower, newBehavior);
1063 assertNull("Expected null leaderId", follower.getLeaderId());
1067 public void testElectionScheduledWhenAnyRaftRPCReceived() {
1068 MockRaftActorContext context = createActorContext();
1069 follower = createBehavior(context);
1070 follower.handleMessage(leaderActor, new RaftRPC() {
1071 private static final long serialVersionUID = 1L;
1074 public long getTerm() {
1078 verify(follower).scheduleElection(any(FiniteDuration.class));
1082 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1083 MockRaftActorContext context = createActorContext();
1084 follower = createBehavior(context);
1085 follower.handleMessage(leaderActor, "non-raft-rpc");
1086 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1089 public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
1090 int snapshotLength = bs.size();
1092 int size = chunkSize;
1093 if (chunkSize > snapshotLength) {
1094 size = snapshotLength;
1096 if (start + chunkSize > snapshotLength) {
1097 size = snapshotLength - start;
1101 byte[] nextChunk = new byte[size];
1102 bs.copyTo(nextChunk, start, 0, size);
1106 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1107 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1108 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1111 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1112 String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1113 boolean expForceInstallSnapshot) {
1115 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1116 AppendEntriesReply.class);
1118 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1119 assertEquals("getTerm", expTerm, reply.getTerm());
1120 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1121 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1122 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1123 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1124 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1128 private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1129 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1130 new MockRaftActorContext.MockPayload(data));
1133 private ByteString createSnapshot() {
1134 HashMap<String, String> followerSnapshot = new HashMap<>();
1135 followerSnapshot.put("1", "A");
1136 followerSnapshot.put("2", "B");
1137 followerSnapshot.put("3", "C");
1139 return toByteString(followerSnapshot);
1143 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1144 ActorRef actorRef, RaftRPC rpc) throws Exception {
1145 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1147 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1148 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1152 protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1154 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1155 assertEquals("isSuccess", true, reply.isSuccess());