2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertArrayEquals;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.verify;
23 import com.google.common.base.Stopwatch;
24 import com.google.common.io.ByteSource;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.io.OutputStream;
27 import java.util.List;
29 import java.util.Optional;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.apache.pekko.actor.ActorRef;
33 import org.apache.pekko.dispatch.Dispatchers;
34 import org.apache.pekko.protobuf.ByteString;
35 import org.apache.pekko.testkit.TestActorRef;
36 import org.apache.pekko.testkit.javadsl.TestKit;
37 import org.junit.After;
38 import org.junit.Test;
39 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
40 import org.opendaylight.controller.cluster.raft.MockRaftActor;
41 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
42 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
43 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
44 import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
45 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
46 import org.opendaylight.controller.cluster.raft.RaftActorContext;
47 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
48 import org.opendaylight.controller.cluster.raft.RaftVersions;
49 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
50 import org.opendaylight.controller.cluster.raft.VotingState;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
53 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
54 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
55 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
63 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
65 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
67 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
69 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
72 import org.opendaylight.controller.cluster.raft.spi.TermInfo;
73 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
74 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import scala.concurrent.duration.FiniteDuration;
78 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
79 private final short payloadVersion = 5;
81 private final ActorRef followerActor = actorFactory.createActor(
82 MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
83 private final ActorRef leaderActor = actorFactory.createActor(
84 MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
86 private Follower follower;
90 public void tearDown() {
91 if (follower != null) {
99 protected Follower createBehavior(final RaftActorContext actorContext) {
100 return spy(new Follower(actorContext));
104 protected MockRaftActorContext createActorContext() {
105 return createActorContext(followerActor);
109 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
110 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
111 context.setPayloadVersion(payloadVersion);
112 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
113 peerId -> leaderActor.path().toString());
118 public void testThatAnElectionTimeoutIsTriggered() {
119 MockRaftActorContext actorContext = createActorContext();
120 follower = new Follower(actorContext);
122 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
123 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
127 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
128 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
130 MockRaftActorContext context = createActorContext();
131 follower = new Follower(context);
133 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
134 TimeUnit.MILLISECONDS);
135 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
137 assertTrue(raftBehavior instanceof Candidate);
141 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
142 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
144 MockRaftActorContext context = createActorContext();
145 ((DefaultConfigParamsImpl) context.getConfigParams())
146 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
147 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
149 follower = new Follower(context);
150 context.setCurrentBehavior(follower);
152 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
153 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
154 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
157 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
158 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
159 assertTrue(raftBehavior instanceof Follower);
161 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
162 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
163 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
166 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
167 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
168 assertTrue(raftBehavior instanceof Follower);
172 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
173 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
175 MockRaftActorContext context = createActorContext();
177 context.setTermInfo(new TermInfo(term, null));
179 follower = createBehavior(context);
181 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
183 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
185 assertEquals("isVoteGranted", true, reply.isVoteGranted());
186 assertEquals("getTerm", term, reply.getTerm());
187 verify(follower).scheduleElection(any(FiniteDuration.class));
191 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
192 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
194 MockRaftActorContext context = createActorContext();
196 context.setTermInfo(new TermInfo(term, "test"));
198 follower = createBehavior(context);
200 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
202 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
204 assertEquals("isVoteGranted", false, reply.isVoteGranted());
205 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
210 public void testHandleFirstAppendEntries() {
211 logStart("testHandleFirstAppendEntries");
213 MockRaftActorContext context = createActorContext();
214 context.getReplicatedLog().clear(0,2);
215 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
216 context.getReplicatedLog().setSnapshotIndex(99);
218 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
220 assertEquals(1, context.getReplicatedLog().size());
222 // The new commitIndex is 101
223 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
225 follower = createBehavior(context);
226 follower.handleMessage(leaderActor, appendEntries);
228 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
229 FollowerInitialSyncUpStatus.class);
230 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
232 assertFalse(syncStatus.isInitialSyncDone());
233 assertTrue("append entries reply should be true", reply.isSuccess());
237 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
238 logStart("testHandleFirstAppendEntries");
240 MockRaftActorContext context = createActorContext();
242 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
244 // The new commitIndex is 101
245 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
247 follower = createBehavior(context);
248 follower.handleMessage(leaderActor, appendEntries);
250 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
251 FollowerInitialSyncUpStatus.class);
252 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
254 assertFalse(syncStatus.isInitialSyncDone());
255 assertFalse("append entries reply should be false", reply.isSuccess());
259 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
260 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
262 MockRaftActorContext context = createActorContext();
263 context.getReplicatedLog().clear(0,2);
264 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
265 context.getReplicatedLog().setSnapshotIndex(99);
267 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
269 // The new commitIndex is 101
270 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
272 follower = createBehavior(context);
273 follower.handleMessage(leaderActor, appendEntries);
275 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
276 FollowerInitialSyncUpStatus.class);
277 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
279 assertFalse(syncStatus.isInitialSyncDone());
280 assertTrue("append entries reply should be true", reply.isSuccess());
284 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
285 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
287 MockRaftActorContext context = createActorContext();
288 context.getReplicatedLog().clear(0,2);
289 context.getReplicatedLog().setSnapshotIndex(100);
291 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
293 // The new commitIndex is 101
294 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
296 follower = createBehavior(context);
297 follower.handleMessage(leaderActor, appendEntries);
299 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
300 FollowerInitialSyncUpStatus.class);
301 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
303 assertFalse(syncStatus.isInitialSyncDone());
304 assertTrue("append entries reply should be true", reply.isSuccess());
308 public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
310 "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
312 MockRaftActorContext context = createActorContext();
313 context.getReplicatedLog().clear(0,2);
314 context.getReplicatedLog().setSnapshotIndex(100);
316 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 105, "foo"));
318 // The new commitIndex is 101
319 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
321 follower = createBehavior(context);
322 follower.handleMessage(leaderActor, appendEntries);
324 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
325 FollowerInitialSyncUpStatus.class);
326 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
328 assertFalse(syncStatus.isInitialSyncDone());
329 assertFalse("append entries reply should be false", reply.isSuccess());
333 public void testHandleSyncUpAppendEntries() {
334 logStart("testHandleSyncUpAppendEntries");
336 MockRaftActorContext context = createActorContext();
338 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
340 // The new commitIndex is 101
341 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
343 follower = createBehavior(context);
344 follower.handleMessage(leaderActor, appendEntries);
346 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
347 FollowerInitialSyncUpStatus.class);
349 assertFalse(syncStatus.isInitialSyncDone());
351 // Clear all the messages
352 MessageCollectorActor.clearMessages(followerActor);
354 context.setLastApplied(101);
355 context.setCommitIndex(101);
356 setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
358 entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
360 // The new commitIndex is 101
361 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
362 follower.handleMessage(leaderActor, appendEntries);
364 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
366 assertTrue(syncStatus.isInitialSyncDone());
368 MessageCollectorActor.clearMessages(followerActor);
370 // Sending the same message again should not generate another message
372 follower.handleMessage(leaderActor, appendEntries);
374 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
376 assertNull(syncStatus);
380 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
381 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
383 MockRaftActorContext context = createActorContext();
385 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
387 // The new commitIndex is 101
388 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
390 follower = createBehavior(context);
391 follower.handleMessage(leaderActor, appendEntries);
393 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
394 FollowerInitialSyncUpStatus.class);
396 assertFalse(syncStatus.isInitialSyncDone());
398 // Clear all the messages
399 MessageCollectorActor.clearMessages(followerActor);
401 context.setLastApplied(100);
402 setLastLogEntry(context, 1, 100,
403 new MockRaftActorContext.MockPayload(""));
405 entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
407 // leader-2 is becoming the leader now and it says the commitIndex is 45
408 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
409 follower.handleMessage(leaderActor, appendEntries);
411 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
413 // We get a new message saying initial status is not done
414 assertFalse(syncStatus.isInitialSyncDone());
418 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
419 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
421 MockRaftActorContext context = createActorContext();
423 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
425 // The new commitIndex is 101
426 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
428 follower = createBehavior(context);
429 follower.handleMessage(leaderActor, appendEntries);
431 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
432 FollowerInitialSyncUpStatus.class);
434 assertFalse(syncStatus.isInitialSyncDone());
436 // Clear all the messages
437 MessageCollectorActor.clearMessages(followerActor);
439 context.setLastApplied(101);
440 context.setCommitIndex(101);
441 setLastLogEntry(context, 1, 101,
442 new MockRaftActorContext.MockPayload(""));
444 entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
446 // The new commitIndex is 101
447 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
448 follower.handleMessage(leaderActor, appendEntries);
450 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
452 assertTrue(syncStatus.isInitialSyncDone());
454 // Clear all the messages
455 MessageCollectorActor.clearMessages(followerActor);
457 context.setLastApplied(100);
458 setLastLogEntry(context, 1, 100,
459 new MockRaftActorContext.MockPayload(""));
461 entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
463 // leader-2 is becoming the leader now and it says the commitIndex is 45
464 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
465 follower.handleMessage(leaderActor, appendEntries);
467 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
469 // We get a new message saying initial status is not done
470 assertFalse(syncStatus.isInitialSyncDone());
474 * This test verifies that when an AppendEntries RPC is received by a RaftActor
475 * with a commitIndex that is greater than what has been applied to the
476 * state machine of the RaftActor, the RaftActor applies the state and
477 * sets it current applied state to the commitIndex of the sender.
480 public void testHandleAppendEntriesWithNewerCommitIndex() {
481 logStart("testHandleAppendEntriesWithNewerCommitIndex");
483 MockRaftActorContext context = createActorContext();
485 context.setLastApplied(100);
486 setLastLogEntry(context, 1, 100,
487 new MockRaftActorContext.MockPayload(""));
488 context.getReplicatedLog().setSnapshotIndex(99);
490 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
492 // The new commitIndex is 101
493 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
495 follower = createBehavior(context);
496 follower.handleMessage(leaderActor, appendEntries);
498 assertEquals("getLastApplied", 101L, context.getLastApplied());
502 * This test verifies that when an AppendEntries is received with a prevLogTerm
503 * which does not match the term that is in RaftActors log entry at prevLogIndex
504 * then the RaftActor does not change it's state and it returns a failure.
507 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
508 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
510 MockRaftActorContext context = createActorContext();
512 AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, List.of(), 101, -1, (short)0);
514 follower = createBehavior(context);
516 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
518 assertSame(follower, newBehavior);
520 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
521 AppendEntriesReply.class);
523 assertEquals("isSuccess", false, reply.isSuccess());
527 public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
528 logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
530 MockRaftActorContext context = createActorContext();
531 context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
532 context.getReplicatedLog().setSnapshotIndex(4);
533 context.getReplicatedLog().setSnapshotTerm(3);
535 AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, List.of(), 8, -1, (short)0);
537 follower = createBehavior(context);
539 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
541 assertSame(follower, newBehavior);
543 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
545 assertEquals("isSuccess", true, reply.isSuccess());
549 * This test verifies that when a new AppendEntries message is received with
550 * new entries and the logs of the sender and receiver match that the new
551 * entries get added to the log and the log is incremented by the number of
552 * entries received in appendEntries.
555 public void testHandleAppendEntriesAddNewEntries() {
556 logStart("testHandleAppendEntriesAddNewEntries");
558 MockRaftActorContext context = createActorContext();
560 // First set the receivers term to lower number
561 context.setTermInfo(new TermInfo(1, "test"));
563 // Prepare the receivers log
564 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
565 log.append(newReplicatedLogEntry(1, 0, "zero"));
566 log.append(newReplicatedLogEntry(1, 1, "one"));
567 log.append(newReplicatedLogEntry(1, 2, "two"));
569 context.setReplicatedLog(log);
571 // Prepare the entries to be sent with AppendEntries
572 List<ReplicatedLogEntry> entries = List.of(
573 newReplicatedLogEntry(1, 3, "three"), newReplicatedLogEntry(1, 4, "four"));
575 // Send appendEntries with the same term as was set on the receiver
576 // before the new behavior was created (1 in this case)
577 // This will not work for a Candidate because as soon as a Candidate
578 // is created it increments the term
579 short leaderPayloadVersion = 10;
580 String leaderId = "leader-1";
581 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
583 follower = createBehavior(context);
585 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
587 assertSame(follower, newBehavior);
589 assertEquals("Next index", 5, log.last().index() + 1);
590 assertEquals("Entry 3", entries.get(0), log.get(3));
591 assertEquals("Entry 4", entries.get(1), log.get(4));
593 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
594 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
596 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
600 * This test verifies that when a new AppendEntries message is received with
601 * new entries and the logs of the sender and receiver are out-of-sync that
602 * the log is first corrected by removing the out of sync entries from the
603 * log and then adding in the new entries sent with the AppendEntries message.
606 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
607 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
609 MockRaftActorContext context = createActorContext();
611 // First set the receivers term to lower number
612 context.setTermInfo(new TermInfo(1, "test"));
614 // Prepare the receivers log
615 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
616 log.append(newReplicatedLogEntry(1, 0, "zero"));
617 log.append(newReplicatedLogEntry(1, 1, "one"));
618 log.append(newReplicatedLogEntry(1, 2, "two"));
620 context.setReplicatedLog(log);
622 // Prepare the entries to be sent with AppendEntries
623 List<ReplicatedLogEntry> entries = List.of(
624 newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
626 // Send appendEntries with the same term as was set on the receiver
627 // before the new behavior was created (1 in this case)
628 // This will not work for a Candidate because as soon as a Candidate
629 // is created it increments the term
630 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
632 follower = createBehavior(context);
634 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
636 assertSame(follower, newBehavior);
638 // The entry at index 2 will be found out-of-sync with the leader
639 // and will be removed
640 // Then the two new entries will be added to the log
641 // Thus making the log to have 4 entries
642 assertEquals("Next index", 4, log.last().index() + 1);
643 //assertEquals("Entry 2", entries.get(0), log.get(2));
645 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
647 // Check that the entry at index 2 has the new data
648 assertEquals("Entry 2", entries.get(0), log.get(2));
650 assertEquals("Entry 3", entries.get(1), log.get(3));
652 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
656 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
657 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
659 MockRaftActorContext context = createActorContext();
661 // First set the receivers term to lower number
662 context.setTermInfo(new TermInfo(1, "test"));
664 // Prepare the receivers log
665 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
666 log.append(newReplicatedLogEntry(1, 0, "zero"));
667 log.append(newReplicatedLogEntry(1, 1, "one"));
668 log.append(newReplicatedLogEntry(1, 2, "two"));
670 context.setReplicatedLog(log);
672 // Prepare the entries to be sent with AppendEntries
673 List<ReplicatedLogEntry> entries = List.of(
674 newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
676 // Send appendEntries with the same term as was set on the receiver
677 // before the new behavior was created (1 in this case)
678 // This will not work for a Candidate because as soon as a Candidate
679 // is created it increments the term
680 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
682 context.setRaftPolicy(createRaftPolicy(false, true));
683 follower = createBehavior(context);
685 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
687 assertSame(follower, newBehavior);
689 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
693 public void testHandleAppendEntriesPreviousLogEntryMissing() {
694 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
696 final MockRaftActorContext context = createActorContext();
698 // Prepare the receivers log
699 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
700 log.append(newReplicatedLogEntry(1, 0, "zero"));
701 log.append(newReplicatedLogEntry(1, 1, "one"));
702 log.append(newReplicatedLogEntry(1, 2, "two"));
704 context.setReplicatedLog(log);
706 // Prepare the entries to be sent with AppendEntries
707 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
709 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
711 follower = createBehavior(context);
713 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
715 assertSame(follower, newBehavior);
717 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
721 public void testHandleAppendEntriesWithExistingLogEntry() {
722 logStart("testHandleAppendEntriesWithExistingLogEntry");
724 MockRaftActorContext context = createActorContext();
726 context.setTermInfo(new TermInfo(1, "test"));
728 // Prepare the receivers log
729 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
730 log.append(newReplicatedLogEntry(1, 0, "zero"));
731 log.append(newReplicatedLogEntry(1, 1, "one"));
733 context.setReplicatedLog(log);
735 // Send the last entry again.
736 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 1, "one"));
738 follower = createBehavior(context);
740 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
742 assertEquals("Next index", 2, log.last().index() + 1);
743 assertEquals("Entry 1", entries.get(0), log.get(1));
745 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
747 // Send the last entry again and also a new one.
749 entries = List.of(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
751 MessageCollectorActor.clearMessages(leaderActor);
752 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
754 assertEquals("Next index", 3, log.last().index() + 1);
755 assertEquals("Entry 1", entries.get(0), log.get(1));
756 assertEquals("Entry 2", entries.get(1), log.get(2));
758 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
762 public void testHandleAppendEntriesAfterInstallingSnapshot() {
763 logStart("testHandleAppendAfterInstallingSnapshot");
765 MockRaftActorContext context = createActorContext();
767 // Prepare the receivers log
768 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
770 // Set up a log as if it has been snapshotted
771 log.setSnapshotIndex(3);
772 log.setSnapshotTerm(1);
774 context.setReplicatedLog(log);
776 // Prepare the entries to be sent with AppendEntries
777 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
779 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
781 follower = createBehavior(context);
783 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
785 assertSame(follower, newBehavior);
787 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
791 * This test verifies that when InstallSnapshot is received by
792 * the follower its applied correctly.
795 public void testHandleInstallSnapshot() {
796 logStart("testHandleInstallSnapshot");
798 MockRaftActorContext context = createActorContext();
799 context.setTermInfo(new TermInfo(1, "leader"));
801 follower = createBehavior(context);
803 ByteString bsSnapshot = createSnapshot();
805 int snapshotLength = bsSnapshot.size();
807 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
808 int lastIncludedIndex = 1;
810 InstallSnapshot lastInstallSnapshot = null;
812 for (int i = 0; i < totalChunks; i++) {
813 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
814 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
815 chunkData, chunkIndex, totalChunks);
816 follower.handleMessage(leaderActor, lastInstallSnapshot);
817 offset = offset + 50;
822 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
823 ApplySnapshot.class);
824 Snapshot snapshot = applySnapshot.getSnapshot();
825 assertNotNull(lastInstallSnapshot);
826 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
827 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
828 snapshot.getLastAppliedTerm());
829 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
830 snapshot.getLastAppliedIndex());
831 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
832 assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
833 assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
834 assertEquals(new TermInfo(1, "leader"), snapshot.termInfo());
835 applySnapshot.getCallback().onSuccess();
837 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
838 leaderActor, InstallSnapshotReply.class);
839 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
842 for (InstallSnapshotReply reply: replies) {
843 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
844 assertEquals("getTerm", 1, reply.getTerm());
845 assertEquals("isSuccess", true, reply.isSuccess());
846 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
849 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
853 * Verify that when an AppendEntries is sent to a follower during a snapshot install
854 * the Follower short-circuits the processing of the AppendEntries message.
857 public void testReceivingAppendEntriesDuringInstallSnapshot() {
858 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
860 MockRaftActorContext context = createActorContext();
862 follower = createBehavior(context);
864 ByteString bsSnapshot = createSnapshot();
865 int snapshotLength = bsSnapshot.size();
867 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
868 int lastIncludedIndex = 1;
870 // Check that snapshot installation is not in progress
871 assertNull(follower.getSnapshotTracker());
873 // Make sure that we have more than 1 chunk to send
874 assertTrue(totalChunks > 1);
876 // Send an install snapshot with the first chunk to start the process of installing a snapshot
877 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
878 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
879 chunkData, 1, totalChunks));
881 // Check if snapshot installation is in progress now
882 assertNotNull(follower.getSnapshotTracker());
884 // Send an append entry
885 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
886 List.of(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
888 follower.handleMessage(leaderActor, appendEntries);
890 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
891 assertEquals("isSuccess", true, reply.isSuccess());
892 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
893 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
894 assertEquals("getTerm", context.currentTerm(), reply.getTerm());
896 assertNotNull(follower.getSnapshotTracker());
900 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
901 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
903 MockRaftActorContext context = createActorContext();
905 follower = createBehavior(context);
907 ByteString bsSnapshot = createSnapshot();
908 int snapshotLength = bsSnapshot.size();
910 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
911 int lastIncludedIndex = 1;
913 // Check that snapshot installation is not in progress
914 assertNull(follower.getSnapshotTracker());
916 // Make sure that we have more than 1 chunk to send
917 assertTrue(totalChunks > 1);
919 // Send an install snapshot with the first chunk to start the process of installing a snapshot
920 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
921 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
922 chunkData, 1, totalChunks));
924 // Check if snapshot installation is in progress now
925 assertNotNull(follower.getSnapshotTracker());
927 // Send appendEntries with a new term and leader.
928 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
929 List.of(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
931 follower.handleMessage(leaderActor, appendEntries);
933 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
934 assertEquals("isSuccess", true, reply.isSuccess());
935 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
936 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
937 assertEquals("getTerm", 2, reply.getTerm());
939 assertNull(follower.getSnapshotTracker());
943 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
944 logStart("testInitialSyncUpWithHandleInstallSnapshot");
946 MockRaftActorContext context = createActorContext();
947 context.setCommitIndex(-1);
949 follower = createBehavior(context);
951 ByteString bsSnapshot = createSnapshot();
953 int snapshotLength = bsSnapshot.size();
955 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
956 int lastIncludedIndex = 1;
958 InstallSnapshot lastInstallSnapshot = null;
960 for (int i = 0; i < totalChunks; i++) {
961 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
962 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
963 chunkData, chunkIndex, totalChunks);
964 follower.handleMessage(leaderActor, lastInstallSnapshot);
965 offset = offset + 50;
970 FollowerInitialSyncUpStatus syncStatus =
971 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
973 assertFalse(syncStatus.isInitialSyncDone());
975 // Clear all the messages
976 MessageCollectorActor.clearMessages(followerActor);
978 context.setLastApplied(101);
979 context.setCommitIndex(101);
980 setLastLogEntry(context, 1, 101,
981 new MockRaftActorContext.MockPayload(""));
983 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
985 // The new commitIndex is 101
986 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
987 follower.handleMessage(leaderActor, appendEntries);
989 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
991 assertTrue(syncStatus.isInitialSyncDone());
995 public void testHandleOutOfSequenceInstallSnapshot() {
996 logStart("testHandleOutOfSequenceInstallSnapshot");
998 MockRaftActorContext context = createActorContext();
1000 follower = createBehavior(context);
1002 ByteString bsSnapshot = createSnapshot();
1004 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1005 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1006 follower.handleMessage(leaderActor, installSnapshot);
1008 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1009 InstallSnapshotReply.class);
1011 assertEquals("isSuccess", false, reply.isSuccess());
1012 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1013 assertEquals("getTerm", 1, reply.getTerm());
1014 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1016 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1020 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1021 MockRaftActorContext context = createActorContext();
1023 Stopwatch stopwatch = Stopwatch.createStarted();
1025 follower = createBehavior(context);
1027 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1029 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1031 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1033 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1034 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1038 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1039 MockRaftActorContext context = createActorContext();
1040 context.setConfigParams(new DefaultConfigParamsImpl() {
1042 public FiniteDuration getElectionTimeOutInterval() {
1043 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1047 context.setRaftPolicy(createRaftPolicy(false, false));
1049 follower = createBehavior(context);
1051 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1052 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1053 assertSame("handleMessage result", follower, newBehavior);
1057 public void testFollowerSchedulesElectionIfNonVoting() {
1058 MockRaftActorContext context = createActorContext();
1059 context.updatePeerIds(new ServerConfigurationPayload(List.of(new ServerInfo(context.getId(), false))));
1060 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1061 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1062 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1064 follower = new Follower(context, "leader", (short)1);
1066 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1067 ElectionTimeout.class);
1068 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1069 assertSame("handleMessage result", follower, newBehavior);
1070 assertNull("Expected null leaderId", follower.getLeaderId());
1074 // TODO: parameterized with all possible RaftRPCs
1075 public void testElectionScheduledWhenAnyRaftRPCReceived() {
1076 MockRaftActorContext context = createActorContext();
1077 follower = createBehavior(context);
1078 follower.handleMessage(leaderActor, new RequestVoteReply(100, false));
1079 verify(follower).scheduleElection(any(FiniteDuration.class));
1083 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1084 MockRaftActorContext context = createActorContext();
1085 follower = createBehavior(context);
1086 follower.handleMessage(leaderActor, "non-raft-rpc");
1087 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1091 public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1092 String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1095 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1097 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1098 config.setSnapshotBatchCount(2);
1099 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1101 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1102 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1103 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1104 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1105 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1106 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1107 followerRaftActor.set(followerActorRef.underlyingActor());
1108 followerRaftActor.get().waitForInitializeBehaviorComplete();
1110 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1111 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1112 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1114 List<ReplicatedLogEntry> entries = List.of(
1115 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1117 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1119 followerActorRef.tell(appendEntries, leaderActor);
1121 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1122 assertEquals("isSuccess", true, reply.isSuccess());
1124 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1126 InMemoryJournal.waitForDeleteMessagesComplete(id);
1127 InMemoryJournal.waitForWriteMessagesComplete(id);
1128 // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1129 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1130 // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1131 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1132 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1133 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1134 assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1136 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1137 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1138 assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1139 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1140 assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1141 assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData()),
1142 MockRaftActor.fromState(snapshot.getState()));
1146 public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1147 String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1150 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1152 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1153 config.setSnapshotBatchCount(2);
1154 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1156 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1157 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1158 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1159 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1160 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1161 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1162 followerRaftActor.set(followerActorRef.underlyingActor());
1163 followerRaftActor.get().waitForInitializeBehaviorComplete();
1165 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1166 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1167 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1169 List<ReplicatedLogEntry> entries = List.of(
1170 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1171 newReplicatedLogEntry(1, 2, "three"));
1173 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1175 followerActorRef.tell(appendEntries, leaderActor);
1177 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1178 assertEquals("isSuccess", true, reply.isSuccess());
1180 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1182 InMemoryJournal.waitForDeleteMessagesComplete(id);
1183 InMemoryJournal.waitForWriteMessagesComplete(id);
1184 // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1185 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1186 // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1187 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1188 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1189 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1190 assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1192 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1193 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1194 assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1195 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1196 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1197 assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData(),
1198 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1200 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1201 assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1203 // Reinstate the actor from persistence
1205 actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1207 followerActorRef = actorFactory.createTestActor(builder.props()
1208 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1209 followerRaftActor.set(followerActorRef.underlyingActor());
1210 followerRaftActor.get().waitForInitializeBehaviorComplete();
1212 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1213 assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1214 assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1215 assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1216 assertEquals("State", List.of(entries.get(0).getData(), entries.get(1).getData(),
1217 entries.get(2).getData()), followerRaftActor.get().getState());
1221 public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1222 String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1225 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1227 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1228 config.setSnapshotBatchCount(1);
1229 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1231 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1232 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1233 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1234 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1235 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1236 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1237 followerRaftActor.set(followerActorRef.underlyingActor());
1238 followerRaftActor.get().waitForInitializeBehaviorComplete();
1240 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1241 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1242 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1244 List<ReplicatedLogEntry> entries = List.of(
1245 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1246 newReplicatedLogEntry(1, 2, "three"));
1248 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1250 followerActorRef.tell(appendEntries, leaderActor);
1252 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1253 assertEquals("isSuccess", true, reply.isSuccess());
1255 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1257 InMemoryJournal.waitForDeleteMessagesComplete(id);
1258 InMemoryJournal.waitForWriteMessagesComplete(id);
1259 // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1260 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1261 // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1262 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1263 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1264 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1265 assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1267 assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1268 assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).index());
1269 assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).index());
1270 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1271 assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1272 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1273 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1274 assertEquals("Snapshot state", List.of(entries.get(0).getData()),
1275 MockRaftActor.fromState(snapshot.getState()));
1279 public void testNeedsLeaderAddress() {
1280 logStart("testNeedsLeaderAddress");
1282 MockRaftActorContext context = createActorContext();
1283 context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
1284 context.addToPeers("leader", null, VotingState.VOTING);
1285 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
1287 follower = createBehavior(context);
1289 follower.handleMessage(leaderActor,
1290 new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, (short)0));
1292 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1293 assertTrue(reply.isNeedsLeaderAddress());
1294 MessageCollectorActor.clearMessages(leaderActor);
1296 PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
1297 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
1299 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1,
1300 (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
1302 reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1303 assertFalse(reply.isNeedsLeaderAddress());
1305 verify(mockResolver).setResolved("leader", leaderActor.path().toString());
1308 @SuppressWarnings("checkstyle:IllegalCatch")
1309 private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1310 final AtomicReference<MockRaftActor> followerRaftActor) {
1311 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1313 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1315 actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1316 installSnapshotStream), actorRef);
1317 } catch (RuntimeException e) {
1319 } catch (Exception e) {
1320 throw new RuntimeException(e);
1325 public void applySnapshot(final State snapshotState) {
1329 public State deserializeSnapshot(final ByteSource snapshotBytes) {
1330 throw new UnsupportedOperationException();
1333 return snapshotCohort;
1336 public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1337 int snapshotLength = bs.size();
1339 int size = chunkSize;
1340 if (chunkSize > snapshotLength) {
1341 size = snapshotLength;
1342 } else if (start + chunkSize > snapshotLength) {
1343 size = snapshotLength - start;
1346 byte[] nextChunk = new byte[size];
1347 bs.copyTo(nextChunk, start, 0, size);
1351 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1352 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1353 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1356 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1357 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1358 final boolean expForceInstallSnapshot) {
1360 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1361 AppendEntriesReply.class);
1363 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1364 assertEquals("getTerm", expTerm, reply.getTerm());
1365 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1366 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1367 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1368 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1369 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1370 assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
1374 private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1375 return new SimpleReplicatedLogEntry(index, term,
1376 new MockRaftActorContext.MockPayload(data));
1379 private ByteString createSnapshot() {
1380 return toByteString(Map.of("1", "A", "2", "B", "3", "C"));
1384 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1385 final ActorRef actorRef, final RaftRPC rpc) {
1386 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1388 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1389 assertEquals("New votedFor", expVotedFor, actorContext.termInfo().votedFor());
1393 protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1394 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1395 assertEquals("isSuccess", true, reply.isSuccess());