2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.dispatch.Dispatchers;
24 import akka.protobuf.ByteString;
25 import akka.testkit.TestActorRef;
26 import akka.testkit.javadsl.TestKit;
27 import com.google.common.base.Stopwatch;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.OutputStream;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Optional;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.After;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.MockRaftActor;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
48 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
49 import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
50 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
51 import org.opendaylight.controller.cluster.raft.RaftActorContext;
52 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
60 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
63 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
66 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
67 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
68 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
69 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
70 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
71 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
72 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
73 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
74 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
75 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
76 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
77 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
78 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
79 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
80 import scala.concurrent.duration.FiniteDuration;
82 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
84 private final ActorRef followerActor = actorFactory.createActor(
85 MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
87 private final ActorRef leaderActor = actorFactory.createActor(
88 MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
90 private Follower follower;
92 private final short payloadVersion = 5;
96 public void tearDown() {
97 if (follower != null) {
105 protected Follower createBehavior(final RaftActorContext actorContext) {
106 return spy(new Follower(actorContext));
110 protected MockRaftActorContext createActorContext() {
111 return createActorContext(followerActor);
115 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
116 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
117 context.setPayloadVersion(payloadVersion);
118 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
119 peerId -> leaderActor.path().toString());
124 public void testThatAnElectionTimeoutIsTriggered() {
125 MockRaftActorContext actorContext = createActorContext();
126 follower = new Follower(actorContext);
128 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
129 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
133 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
134 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
136 MockRaftActorContext context = createActorContext();
137 follower = new Follower(context);
139 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
140 TimeUnit.MILLISECONDS);
141 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
143 assertTrue(raftBehavior instanceof Candidate);
147 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
148 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
150 MockRaftActorContext context = createActorContext();
151 ((DefaultConfigParamsImpl) context.getConfigParams())
152 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
153 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
155 follower = new Follower(context);
156 context.setCurrentBehavior(follower);
158 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
159 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
160 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
163 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
164 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
165 assertTrue(raftBehavior instanceof Follower);
167 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
168 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
169 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
172 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
173 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
174 assertTrue(raftBehavior instanceof Follower);
178 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
179 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
181 MockRaftActorContext context = createActorContext();
183 context.getTermInformation().update(term, null);
185 follower = createBehavior(context);
187 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
189 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
191 assertEquals("isVoteGranted", true, reply.isVoteGranted());
192 assertEquals("getTerm", term, reply.getTerm());
193 verify(follower).scheduleElection(any(FiniteDuration.class));
197 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
198 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
200 MockRaftActorContext context = createActorContext();
202 context.getTermInformation().update(term, "test");
204 follower = createBehavior(context);
206 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
208 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
210 assertEquals("isVoteGranted", false, reply.isVoteGranted());
211 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
216 public void testHandleFirstAppendEntries() {
217 logStart("testHandleFirstAppendEntries");
219 MockRaftActorContext context = createActorContext();
220 context.getReplicatedLog().clear(0,2);
221 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
222 context.getReplicatedLog().setSnapshotIndex(99);
224 List<ReplicatedLogEntry> entries = Arrays.asList(
225 newReplicatedLogEntry(2, 101, "foo"));
227 Assert.assertEquals(1, context.getReplicatedLog().size());
229 // The new commitIndex is 101
230 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
232 follower = createBehavior(context);
233 follower.handleMessage(leaderActor, appendEntries);
235 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
236 FollowerInitialSyncUpStatus.class);
237 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
239 assertFalse(syncStatus.isInitialSyncDone());
240 assertTrue("append entries reply should be true", reply.isSuccess());
244 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
245 logStart("testHandleFirstAppendEntries");
247 MockRaftActorContext context = createActorContext();
249 List<ReplicatedLogEntry> entries = Arrays.asList(
250 newReplicatedLogEntry(2, 101, "foo"));
252 // The new commitIndex is 101
253 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
255 follower = createBehavior(context);
256 follower.handleMessage(leaderActor, appendEntries);
258 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
259 FollowerInitialSyncUpStatus.class);
260 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
262 assertFalse(syncStatus.isInitialSyncDone());
263 assertFalse("append entries reply should be false", reply.isSuccess());
267 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
268 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
270 MockRaftActorContext context = createActorContext();
271 context.getReplicatedLog().clear(0,2);
272 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
273 context.getReplicatedLog().setSnapshotIndex(99);
275 List<ReplicatedLogEntry> entries = Arrays.asList(
276 newReplicatedLogEntry(2, 101, "foo"));
278 // The new commitIndex is 101
279 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
281 follower = createBehavior(context);
282 follower.handleMessage(leaderActor, appendEntries);
284 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
285 FollowerInitialSyncUpStatus.class);
286 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
288 assertFalse(syncStatus.isInitialSyncDone());
289 assertTrue("append entries reply should be true", reply.isSuccess());
293 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
294 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
296 MockRaftActorContext context = createActorContext();
297 context.getReplicatedLog().clear(0,2);
298 context.getReplicatedLog().setSnapshotIndex(100);
300 List<ReplicatedLogEntry> entries = Arrays.asList(
301 newReplicatedLogEntry(2, 101, "foo"));
303 // The new commitIndex is 101
304 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
306 follower = createBehavior(context);
307 follower.handleMessage(leaderActor, appendEntries);
309 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
310 FollowerInitialSyncUpStatus.class);
311 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
313 assertFalse(syncStatus.isInitialSyncDone());
314 assertTrue("append entries reply should be true", reply.isSuccess());
318 public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
320 "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
322 MockRaftActorContext context = createActorContext();
323 context.getReplicatedLog().clear(0,2);
324 context.getReplicatedLog().setSnapshotIndex(100);
326 List<ReplicatedLogEntry> entries = Arrays.asList(
327 newReplicatedLogEntry(2, 105, "foo"));
329 // The new commitIndex is 101
330 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
332 follower = createBehavior(context);
333 follower.handleMessage(leaderActor, appendEntries);
335 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
336 FollowerInitialSyncUpStatus.class);
337 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
339 assertFalse(syncStatus.isInitialSyncDone());
340 assertFalse("append entries reply should be false", reply.isSuccess());
344 public void testHandleSyncUpAppendEntries() {
345 logStart("testHandleSyncUpAppendEntries");
347 MockRaftActorContext context = createActorContext();
349 List<ReplicatedLogEntry> entries = Arrays.asList(
350 newReplicatedLogEntry(2, 101, "foo"));
352 // The new commitIndex is 101
353 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
355 follower = createBehavior(context);
356 follower.handleMessage(leaderActor, appendEntries);
358 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
359 FollowerInitialSyncUpStatus.class);
361 assertFalse(syncStatus.isInitialSyncDone());
363 // Clear all the messages
364 MessageCollectorActor.clearMessages(followerActor);
366 context.setLastApplied(101);
367 context.setCommitIndex(101);
368 setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
370 entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
372 // The new commitIndex is 101
373 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
374 follower.handleMessage(leaderActor, appendEntries);
376 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
378 assertTrue(syncStatus.isInitialSyncDone());
380 MessageCollectorActor.clearMessages(followerActor);
382 // Sending the same message again should not generate another message
384 follower.handleMessage(leaderActor, appendEntries);
386 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
388 assertNull(syncStatus);
392 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
393 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
395 MockRaftActorContext context = createActorContext();
397 List<ReplicatedLogEntry> entries = Arrays.asList(
398 newReplicatedLogEntry(2, 101, "foo"));
400 // The new commitIndex is 101
401 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
403 follower = createBehavior(context);
404 follower.handleMessage(leaderActor, appendEntries);
406 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
407 FollowerInitialSyncUpStatus.class);
409 assertFalse(syncStatus.isInitialSyncDone());
411 // Clear all the messages
412 MessageCollectorActor.clearMessages(followerActor);
414 context.setLastApplied(100);
415 setLastLogEntry(context, 1, 100,
416 new MockRaftActorContext.MockPayload(""));
418 entries = Arrays.asList(
419 newReplicatedLogEntry(2, 101, "foo"));
421 // leader-2 is becoming the leader now and it says the commitIndex is 45
422 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
423 follower.handleMessage(leaderActor, appendEntries);
425 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
427 // We get a new message saying initial status is not done
428 assertFalse(syncStatus.isInitialSyncDone());
432 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
433 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
435 MockRaftActorContext context = createActorContext();
437 List<ReplicatedLogEntry> entries = Arrays.asList(
438 newReplicatedLogEntry(2, 101, "foo"));
440 // The new commitIndex is 101
441 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
443 follower = createBehavior(context);
444 follower.handleMessage(leaderActor, appendEntries);
446 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
447 FollowerInitialSyncUpStatus.class);
449 assertFalse(syncStatus.isInitialSyncDone());
451 // Clear all the messages
452 MessageCollectorActor.clearMessages(followerActor);
454 context.setLastApplied(101);
455 context.setCommitIndex(101);
456 setLastLogEntry(context, 1, 101,
457 new MockRaftActorContext.MockPayload(""));
459 entries = Arrays.asList(
460 newReplicatedLogEntry(2, 101, "foo"));
462 // The new commitIndex is 101
463 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
464 follower.handleMessage(leaderActor, appendEntries);
466 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
468 assertTrue(syncStatus.isInitialSyncDone());
470 // Clear all the messages
471 MessageCollectorActor.clearMessages(followerActor);
473 context.setLastApplied(100);
474 setLastLogEntry(context, 1, 100,
475 new MockRaftActorContext.MockPayload(""));
477 entries = Arrays.asList(
478 newReplicatedLogEntry(2, 101, "foo"));
480 // leader-2 is becoming the leader now and it says the commitIndex is 45
481 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
482 follower.handleMessage(leaderActor, appendEntries);
484 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
486 // We get a new message saying initial status is not done
487 assertFalse(syncStatus.isInitialSyncDone());
491 * This test verifies that when an AppendEntries RPC is received by a RaftActor
492 * with a commitIndex that is greater than what has been applied to the
493 * state machine of the RaftActor, the RaftActor applies the state and
494 * sets it current applied state to the commitIndex of the sender.
497 public void testHandleAppendEntriesWithNewerCommitIndex() {
498 logStart("testHandleAppendEntriesWithNewerCommitIndex");
500 MockRaftActorContext context = createActorContext();
502 context.setLastApplied(100);
503 setLastLogEntry(context, 1, 100,
504 new MockRaftActorContext.MockPayload(""));
505 context.getReplicatedLog().setSnapshotIndex(99);
507 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
508 newReplicatedLogEntry(2, 101, "foo"));
510 // The new commitIndex is 101
511 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
513 follower = createBehavior(context);
514 follower.handleMessage(leaderActor, appendEntries);
516 assertEquals("getLastApplied", 101L, context.getLastApplied());
520 * This test verifies that when an AppendEntries is received with a prevLogTerm
521 * which does not match the term that is in RaftActors log entry at prevLogIndex
522 * then the RaftActor does not change it's state and it returns a failure.
525 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
526 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
528 MockRaftActorContext context = createActorContext();
530 AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
532 follower = createBehavior(context);
534 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
536 Assert.assertSame(follower, newBehavior);
538 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
539 AppendEntriesReply.class);
541 assertEquals("isSuccess", false, reply.isSuccess());
545 public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
546 logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
548 MockRaftActorContext context = createActorContext();
549 context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
550 context.getReplicatedLog().setSnapshotIndex(4);
551 context.getReplicatedLog().setSnapshotTerm(3);
553 AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
555 follower = createBehavior(context);
557 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
559 Assert.assertSame(follower, newBehavior);
561 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
563 assertEquals("isSuccess", true, reply.isSuccess());
567 * This test verifies that when a new AppendEntries message is received with
568 * new entries and the logs of the sender and receiver match that the new
569 * entries get added to the log and the log is incremented by the number of
570 * entries received in appendEntries.
573 public void testHandleAppendEntriesAddNewEntries() {
574 logStart("testHandleAppendEntriesAddNewEntries");
576 MockRaftActorContext context = createActorContext();
578 // First set the receivers term to lower number
579 context.getTermInformation().update(1, "test");
581 // Prepare the receivers log
582 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
583 log.append(newReplicatedLogEntry(1, 0, "zero"));
584 log.append(newReplicatedLogEntry(1, 1, "one"));
585 log.append(newReplicatedLogEntry(1, 2, "two"));
587 context.setReplicatedLog(log);
589 // Prepare the entries to be sent with AppendEntries
590 List<ReplicatedLogEntry> entries = new ArrayList<>();
591 entries.add(newReplicatedLogEntry(1, 3, "three"));
592 entries.add(newReplicatedLogEntry(1, 4, "four"));
594 // Send appendEntries with the same term as was set on the receiver
595 // before the new behavior was created (1 in this case)
596 // This will not work for a Candidate because as soon as a Candidate
597 // is created it increments the term
598 short leaderPayloadVersion = 10;
599 String leaderId = "leader-1";
600 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
602 follower = createBehavior(context);
604 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
606 Assert.assertSame(follower, newBehavior);
608 assertEquals("Next index", 5, log.last().getIndex() + 1);
609 assertEquals("Entry 3", entries.get(0), log.get(3));
610 assertEquals("Entry 4", entries.get(1), log.get(4));
612 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
613 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
615 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
619 * This test verifies that when a new AppendEntries message is received with
620 * new entries and the logs of the sender and receiver are out-of-sync that
621 * the log is first corrected by removing the out of sync entries from the
622 * log and then adding in the new entries sent with the AppendEntries message.
625 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
626 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
628 MockRaftActorContext context = createActorContext();
630 // First set the receivers term to lower number
631 context.getTermInformation().update(1, "test");
633 // Prepare the receivers log
634 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
635 log.append(newReplicatedLogEntry(1, 0, "zero"));
636 log.append(newReplicatedLogEntry(1, 1, "one"));
637 log.append(newReplicatedLogEntry(1, 2, "two"));
639 context.setReplicatedLog(log);
641 // Prepare the entries to be sent with AppendEntries
642 List<ReplicatedLogEntry> entries = new ArrayList<>();
643 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
644 entries.add(newReplicatedLogEntry(2, 3, "three"));
646 // Send appendEntries with the same term as was set on the receiver
647 // before the new behavior was created (1 in this case)
648 // This will not work for a Candidate because as soon as a Candidate
649 // is created it increments the term
650 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
652 follower = createBehavior(context);
654 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
656 Assert.assertSame(follower, newBehavior);
658 // The entry at index 2 will be found out-of-sync with the leader
659 // and will be removed
660 // Then the two new entries will be added to the log
661 // Thus making the log to have 4 entries
662 assertEquals("Next index", 4, log.last().getIndex() + 1);
663 //assertEquals("Entry 2", entries.get(0), log.get(2));
665 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
667 // Check that the entry at index 2 has the new data
668 assertEquals("Entry 2", entries.get(0), log.get(2));
670 assertEquals("Entry 3", entries.get(1), log.get(3));
672 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
676 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
677 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
679 MockRaftActorContext context = createActorContext();
681 // First set the receivers term to lower number
682 context.getTermInformation().update(1, "test");
684 // Prepare the receivers log
685 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
686 log.append(newReplicatedLogEntry(1, 0, "zero"));
687 log.append(newReplicatedLogEntry(1, 1, "one"));
688 log.append(newReplicatedLogEntry(1, 2, "two"));
690 context.setReplicatedLog(log);
692 // Prepare the entries to be sent with AppendEntries
693 List<ReplicatedLogEntry> entries = new ArrayList<>();
694 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
695 entries.add(newReplicatedLogEntry(2, 3, "three"));
697 // Send appendEntries with the same term as was set on the receiver
698 // before the new behavior was created (1 in this case)
699 // This will not work for a Candidate because as soon as a Candidate
700 // is created it increments the term
701 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
703 context.setRaftPolicy(createRaftPolicy(false, true));
704 follower = createBehavior(context);
706 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
708 Assert.assertSame(follower, newBehavior);
710 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
714 public void testHandleAppendEntriesPreviousLogEntryMissing() {
715 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
717 final MockRaftActorContext context = createActorContext();
719 // Prepare the receivers log
720 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
721 log.append(newReplicatedLogEntry(1, 0, "zero"));
722 log.append(newReplicatedLogEntry(1, 1, "one"));
723 log.append(newReplicatedLogEntry(1, 2, "two"));
725 context.setReplicatedLog(log);
727 // Prepare the entries to be sent with AppendEntries
728 List<ReplicatedLogEntry> entries = new ArrayList<>();
729 entries.add(newReplicatedLogEntry(1, 4, "four"));
731 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
733 follower = createBehavior(context);
735 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
737 Assert.assertSame(follower, newBehavior);
739 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
743 public void testHandleAppendEntriesWithExistingLogEntry() {
744 logStart("testHandleAppendEntriesWithExistingLogEntry");
746 MockRaftActorContext context = createActorContext();
748 context.getTermInformation().update(1, "test");
750 // Prepare the receivers log
751 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
752 log.append(newReplicatedLogEntry(1, 0, "zero"));
753 log.append(newReplicatedLogEntry(1, 1, "one"));
755 context.setReplicatedLog(log);
757 // Send the last entry again.
758 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
760 follower = createBehavior(context);
762 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
764 assertEquals("Next index", 2, log.last().getIndex() + 1);
765 assertEquals("Entry 1", entries.get(0), log.get(1));
767 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
769 // Send the last entry again and also a new one.
771 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
773 MessageCollectorActor.clearMessages(leaderActor);
774 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
776 assertEquals("Next index", 3, log.last().getIndex() + 1);
777 assertEquals("Entry 1", entries.get(0), log.get(1));
778 assertEquals("Entry 2", entries.get(1), log.get(2));
780 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
784 public void testHandleAppendEntriesAfterInstallingSnapshot() {
785 logStart("testHandleAppendAfterInstallingSnapshot");
787 MockRaftActorContext context = createActorContext();
789 // Prepare the receivers log
790 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
792 // Set up a log as if it has been snapshotted
793 log.setSnapshotIndex(3);
794 log.setSnapshotTerm(1);
796 context.setReplicatedLog(log);
798 // Prepare the entries to be sent with AppendEntries
799 List<ReplicatedLogEntry> entries = new ArrayList<>();
800 entries.add(newReplicatedLogEntry(1, 4, "four"));
802 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
804 follower = createBehavior(context);
806 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
808 Assert.assertSame(follower, newBehavior);
810 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
814 * This test verifies that when InstallSnapshot is received by
815 * the follower its applied correctly.
818 public void testHandleInstallSnapshot() {
819 logStart("testHandleInstallSnapshot");
821 MockRaftActorContext context = createActorContext();
822 context.getTermInformation().update(1, "leader");
824 follower = createBehavior(context);
826 ByteString bsSnapshot = createSnapshot();
828 int snapshotLength = bsSnapshot.size();
830 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
831 int lastIncludedIndex = 1;
833 InstallSnapshot lastInstallSnapshot = null;
835 for (int i = 0; i < totalChunks; i++) {
836 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
837 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
838 chunkData, chunkIndex, totalChunks);
839 follower.handleMessage(leaderActor, lastInstallSnapshot);
840 offset = offset + 50;
845 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
846 ApplySnapshot.class);
847 Snapshot snapshot = applySnapshot.getSnapshot();
848 assertNotNull(lastInstallSnapshot);
849 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
850 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
851 snapshot.getLastAppliedTerm());
852 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
853 snapshot.getLastAppliedIndex());
854 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
855 assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
856 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
857 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
858 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
859 applySnapshot.getCallback().onSuccess();
861 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
862 leaderActor, InstallSnapshotReply.class);
863 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
866 for (InstallSnapshotReply reply: replies) {
867 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
868 assertEquals("getTerm", 1, reply.getTerm());
869 assertEquals("isSuccess", true, reply.isSuccess());
870 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
873 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
877 * Verify that when an AppendEntries is sent to a follower during a snapshot install
878 * the Follower short-circuits the processing of the AppendEntries message.
881 public void testReceivingAppendEntriesDuringInstallSnapshot() {
882 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
884 MockRaftActorContext context = createActorContext();
886 follower = createBehavior(context);
888 ByteString bsSnapshot = createSnapshot();
889 int snapshotLength = bsSnapshot.size();
891 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
892 int lastIncludedIndex = 1;
894 // Check that snapshot installation is not in progress
895 assertNull(follower.getSnapshotTracker());
897 // Make sure that we have more than 1 chunk to send
898 assertTrue(totalChunks > 1);
900 // Send an install snapshot with the first chunk to start the process of installing a snapshot
901 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
902 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
903 chunkData, 1, totalChunks));
905 // Check if snapshot installation is in progress now
906 assertNotNull(follower.getSnapshotTracker());
908 // Send an append entry
909 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
910 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
912 follower.handleMessage(leaderActor, appendEntries);
914 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
915 assertEquals("isSuccess", true, reply.isSuccess());
916 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
917 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
918 assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
920 assertNotNull(follower.getSnapshotTracker());
924 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
925 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
927 MockRaftActorContext context = createActorContext();
929 follower = createBehavior(context);
931 ByteString bsSnapshot = createSnapshot();
932 int snapshotLength = bsSnapshot.size();
934 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
935 int lastIncludedIndex = 1;
937 // Check that snapshot installation is not in progress
938 assertNull(follower.getSnapshotTracker());
940 // Make sure that we have more than 1 chunk to send
941 assertTrue(totalChunks > 1);
943 // Send an install snapshot with the first chunk to start the process of installing a snapshot
944 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
945 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
946 chunkData, 1, totalChunks));
948 // Check if snapshot installation is in progress now
949 assertNotNull(follower.getSnapshotTracker());
951 // Send appendEntries with a new term and leader.
952 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
953 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
955 follower.handleMessage(leaderActor, appendEntries);
957 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
958 assertEquals("isSuccess", true, reply.isSuccess());
959 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
960 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
961 assertEquals("getTerm", 2, reply.getTerm());
963 assertNull(follower.getSnapshotTracker());
967 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
968 logStart("testInitialSyncUpWithHandleInstallSnapshot");
970 MockRaftActorContext context = createActorContext();
971 context.setCommitIndex(-1);
973 follower = createBehavior(context);
975 ByteString bsSnapshot = createSnapshot();
977 int snapshotLength = bsSnapshot.size();
979 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
980 int lastIncludedIndex = 1;
982 InstallSnapshot lastInstallSnapshot = null;
984 for (int i = 0; i < totalChunks; i++) {
985 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
986 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
987 chunkData, chunkIndex, totalChunks);
988 follower.handleMessage(leaderActor, lastInstallSnapshot);
989 offset = offset + 50;
994 FollowerInitialSyncUpStatus syncStatus =
995 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
997 assertFalse(syncStatus.isInitialSyncDone());
999 // Clear all the messages
1000 MessageCollectorActor.clearMessages(followerActor);
1002 context.setLastApplied(101);
1003 context.setCommitIndex(101);
1004 setLastLogEntry(context, 1, 101,
1005 new MockRaftActorContext.MockPayload(""));
1007 List<ReplicatedLogEntry> entries = Arrays.asList(
1008 newReplicatedLogEntry(2, 101, "foo"));
1010 // The new commitIndex is 101
1011 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
1012 follower.handleMessage(leaderActor, appendEntries);
1014 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
1016 assertTrue(syncStatus.isInitialSyncDone());
1020 public void testHandleOutOfSequenceInstallSnapshot() {
1021 logStart("testHandleOutOfSequenceInstallSnapshot");
1023 MockRaftActorContext context = createActorContext();
1025 follower = createBehavior(context);
1027 ByteString bsSnapshot = createSnapshot();
1029 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1030 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1031 follower.handleMessage(leaderActor, installSnapshot);
1033 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1034 InstallSnapshotReply.class);
1036 assertEquals("isSuccess", false, reply.isSuccess());
1037 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1038 assertEquals("getTerm", 1, reply.getTerm());
1039 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1041 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1045 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1046 MockRaftActorContext context = createActorContext();
1048 Stopwatch stopwatch = Stopwatch.createStarted();
1050 follower = createBehavior(context);
1052 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1054 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1056 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1058 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1059 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1063 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1064 MockRaftActorContext context = createActorContext();
1065 context.setConfigParams(new DefaultConfigParamsImpl() {
1067 public FiniteDuration getElectionTimeOutInterval() {
1068 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1072 context.setRaftPolicy(createRaftPolicy(false, false));
1074 follower = createBehavior(context);
1076 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1077 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1078 assertSame("handleMessage result", follower, newBehavior);
1082 public void testFollowerSchedulesElectionIfNonVoting() {
1083 MockRaftActorContext context = createActorContext();
1084 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1085 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1086 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1087 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1089 follower = new Follower(context, "leader", (short)1);
1091 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1092 ElectionTimeout.class);
1093 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1094 assertSame("handleMessage result", follower, newBehavior);
1095 assertNull("Expected null leaderId", follower.getLeaderId());
1099 public void testElectionScheduledWhenAnyRaftRPCReceived() {
1100 MockRaftActorContext context = createActorContext();
1101 follower = createBehavior(context);
1102 follower.handleMessage(leaderActor, new RaftRPC() {
1103 private static final long serialVersionUID = 1L;
1106 public long getTerm() {
1110 verify(follower).scheduleElection(any(FiniteDuration.class));
1114 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1115 MockRaftActorContext context = createActorContext();
1116 follower = createBehavior(context);
1117 follower.handleMessage(leaderActor, "non-raft-rpc");
1118 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1122 public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1123 String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1126 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1128 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1129 config.setSnapshotBatchCount(2);
1130 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1132 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1133 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1134 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1135 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1136 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1137 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1138 followerRaftActor.set(followerActorRef.underlyingActor());
1139 followerRaftActor.get().waitForInitializeBehaviorComplete();
1141 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1142 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1143 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1145 List<ReplicatedLogEntry> entries = Arrays.asList(
1146 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1148 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1150 followerActorRef.tell(appendEntries, leaderActor);
1152 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1153 assertEquals("isSuccess", true, reply.isSuccess());
1155 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1157 InMemoryJournal.waitForDeleteMessagesComplete(id);
1158 InMemoryJournal.waitForWriteMessagesComplete(id);
1159 // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1160 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1161 // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1162 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1163 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1164 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1165 assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1167 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1168 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1169 assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1170 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1171 assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1172 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1173 MockRaftActor.fromState(snapshot.getState()));
1177 public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1178 String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1181 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1183 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1184 config.setSnapshotBatchCount(2);
1185 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1187 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1188 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1189 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1190 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1191 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1192 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1193 followerRaftActor.set(followerActorRef.underlyingActor());
1194 followerRaftActor.get().waitForInitializeBehaviorComplete();
1196 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1197 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1198 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1200 List<ReplicatedLogEntry> entries = Arrays.asList(
1201 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1202 newReplicatedLogEntry(1, 2, "three"));
1204 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1206 followerActorRef.tell(appendEntries, leaderActor);
1208 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1209 assertEquals("isSuccess", true, reply.isSuccess());
1211 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1213 InMemoryJournal.waitForDeleteMessagesComplete(id);
1214 InMemoryJournal.waitForWriteMessagesComplete(id);
1215 // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1216 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1217 // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1218 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1219 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1220 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1221 assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1223 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1224 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1225 assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1226 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1227 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1228 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1229 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1231 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1232 assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1234 // Reinstate the actor from persistence
1236 actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1238 followerActorRef = actorFactory.createTestActor(builder.props()
1239 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1240 followerRaftActor.set(followerActorRef.underlyingActor());
1241 followerRaftActor.get().waitForInitializeBehaviorComplete();
1243 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1244 assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1245 assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1246 assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1247 assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1248 entries.get(2).getData()), followerRaftActor.get().getState());
1252 public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1253 String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1256 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1258 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1259 config.setSnapshotBatchCount(1);
1260 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1262 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1263 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1264 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1265 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1266 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1267 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1268 followerRaftActor.set(followerActorRef.underlyingActor());
1269 followerRaftActor.get().waitForInitializeBehaviorComplete();
1271 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1272 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1273 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1275 List<ReplicatedLogEntry> entries = Arrays.asList(
1276 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1277 newReplicatedLogEntry(1, 2, "three"));
1279 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1281 followerActorRef.tell(appendEntries, leaderActor);
1283 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1284 assertEquals("isSuccess", true, reply.isSuccess());
1286 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1288 InMemoryJournal.waitForDeleteMessagesComplete(id);
1289 InMemoryJournal.waitForWriteMessagesComplete(id);
1290 // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1291 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1292 // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1293 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1294 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1295 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1296 assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1298 assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1299 assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1300 assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1301 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1302 assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1303 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1304 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1305 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1306 MockRaftActor.fromState(snapshot.getState()));
1310 public void testNeedsLeaderAddress() {
1311 logStart("testNeedsLeaderAddress");
1313 MockRaftActorContext context = createActorContext();
1314 context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
1315 context.addToPeers("leader", null, VotingState.VOTING);
1316 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
1318 follower = createBehavior(context);
1320 follower.handleMessage(leaderActor,
1321 new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, (short)0));
1323 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1324 assertTrue(reply.isNeedsLeaderAddress());
1325 MessageCollectorActor.clearMessages(leaderActor);
1327 PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
1328 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
1330 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1,
1331 (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
1333 reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1334 assertFalse(reply.isNeedsLeaderAddress());
1336 verify(mockResolver).setResolved("leader", leaderActor.path().toString());
1339 @SuppressWarnings("checkstyle:IllegalCatch")
1340 private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1341 final AtomicReference<MockRaftActor> followerRaftActor) {
1342 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1344 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1346 actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1347 installSnapshotStream), actorRef);
1348 } catch (RuntimeException e) {
1350 } catch (Exception e) {
1351 throw new RuntimeException(e);
1356 public void applySnapshot(final State snapshotState) {
1360 public State deserializeSnapshot(final ByteSource snapshotBytes) {
1361 throw new UnsupportedOperationException();
1364 return snapshotCohort;
1367 public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1368 int snapshotLength = bs.size();
1370 int size = chunkSize;
1371 if (chunkSize > snapshotLength) {
1372 size = snapshotLength;
1374 if (start + chunkSize > snapshotLength) {
1375 size = snapshotLength - start;
1379 byte[] nextChunk = new byte[size];
1380 bs.copyTo(nextChunk, start, 0, size);
1384 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1385 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1386 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1389 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1390 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1391 final boolean expForceInstallSnapshot) {
1393 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1394 AppendEntriesReply.class);
1396 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1397 assertEquals("getTerm", expTerm, reply.getTerm());
1398 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1399 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1400 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1401 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1402 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1403 assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
1407 private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1408 return new SimpleReplicatedLogEntry(index, term,
1409 new MockRaftActorContext.MockPayload(data));
1412 private ByteString createSnapshot() {
1413 HashMap<String, String> followerSnapshot = new HashMap<>();
1414 followerSnapshot.put("1", "A");
1415 followerSnapshot.put("2", "B");
1416 followerSnapshot.put("3", "C");
1418 return toByteString(followerSnapshot);
1422 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1423 final ActorRef actorRef, final RaftRPC rpc) {
1424 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1426 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1427 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1431 protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1432 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1433 assertEquals("isSuccess", true, reply.isSuccess());