2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertArrayEquals;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.verify;
23 import akka.actor.ActorRef;
24 import akka.dispatch.Dispatchers;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.base.Stopwatch;
29 import com.google.common.io.ByteSource;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import java.io.OutputStream;
32 import java.util.List;
34 import java.util.Optional;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.junit.After;
38 import org.junit.Test;
39 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
40 import org.opendaylight.controller.cluster.raft.MockRaftActor;
41 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
42 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
43 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
44 import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
45 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
46 import org.opendaylight.controller.cluster.raft.RaftActorContext;
47 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
48 import org.opendaylight.controller.cluster.raft.RaftVersions;
49 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
50 import org.opendaylight.controller.cluster.raft.VotingState;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
53 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
54 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
55 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
63 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
65 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
67 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
69 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
72 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
73 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
74 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
75 import scala.concurrent.duration.FiniteDuration;
77 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
79 private final ActorRef followerActor = actorFactory.createActor(
80 MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
82 private final ActorRef leaderActor = actorFactory.createActor(
83 MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
85 private Follower follower;
87 private final short payloadVersion = 5;
91 public void tearDown() {
92 if (follower != null) {
100 protected Follower createBehavior(final RaftActorContext actorContext) {
101 return spy(new Follower(actorContext));
105 protected MockRaftActorContext createActorContext() {
106 return createActorContext(followerActor);
110 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
111 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
112 context.setPayloadVersion(payloadVersion);
113 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
114 peerId -> leaderActor.path().toString());
119 public void testThatAnElectionTimeoutIsTriggered() {
120 MockRaftActorContext actorContext = createActorContext();
121 follower = new Follower(actorContext);
123 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
124 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
128 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
129 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
131 MockRaftActorContext context = createActorContext();
132 follower = new Follower(context);
134 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
135 TimeUnit.MILLISECONDS);
136 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
138 assertTrue(raftBehavior instanceof Candidate);
142 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
143 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
145 MockRaftActorContext context = createActorContext();
146 ((DefaultConfigParamsImpl) context.getConfigParams())
147 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
148 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
150 follower = new Follower(context);
151 context.setCurrentBehavior(follower);
153 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
154 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
155 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
158 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
159 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
160 assertTrue(raftBehavior instanceof Follower);
162 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
163 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
164 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
167 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
168 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
169 assertTrue(raftBehavior instanceof Follower);
173 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
174 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
176 MockRaftActorContext context = createActorContext();
178 context.getTermInformation().update(term, null);
180 follower = createBehavior(context);
182 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
184 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
186 assertEquals("isVoteGranted", true, reply.isVoteGranted());
187 assertEquals("getTerm", term, reply.getTerm());
188 verify(follower).scheduleElection(any(FiniteDuration.class));
192 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
193 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
195 MockRaftActorContext context = createActorContext();
197 context.getTermInformation().update(term, "test");
199 follower = createBehavior(context);
201 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
203 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
205 assertEquals("isVoteGranted", false, reply.isVoteGranted());
206 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
211 public void testHandleFirstAppendEntries() {
212 logStart("testHandleFirstAppendEntries");
214 MockRaftActorContext context = createActorContext();
215 context.getReplicatedLog().clear(0,2);
216 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
217 context.getReplicatedLog().setSnapshotIndex(99);
219 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
221 assertEquals(1, context.getReplicatedLog().size());
223 // The new commitIndex is 101
224 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
226 follower = createBehavior(context);
227 follower.handleMessage(leaderActor, appendEntries);
229 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
230 FollowerInitialSyncUpStatus.class);
231 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
233 assertFalse(syncStatus.isInitialSyncDone());
234 assertTrue("append entries reply should be true", reply.isSuccess());
238 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
239 logStart("testHandleFirstAppendEntries");
241 MockRaftActorContext context = createActorContext();
243 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
245 // The new commitIndex is 101
246 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
248 follower = createBehavior(context);
249 follower.handleMessage(leaderActor, appendEntries);
251 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
252 FollowerInitialSyncUpStatus.class);
253 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
255 assertFalse(syncStatus.isInitialSyncDone());
256 assertFalse("append entries reply should be false", reply.isSuccess());
260 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
261 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
263 MockRaftActorContext context = createActorContext();
264 context.getReplicatedLog().clear(0,2);
265 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
266 context.getReplicatedLog().setSnapshotIndex(99);
268 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
270 // The new commitIndex is 101
271 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
273 follower = createBehavior(context);
274 follower.handleMessage(leaderActor, appendEntries);
276 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
277 FollowerInitialSyncUpStatus.class);
278 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
280 assertFalse(syncStatus.isInitialSyncDone());
281 assertTrue("append entries reply should be true", reply.isSuccess());
285 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
286 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
288 MockRaftActorContext context = createActorContext();
289 context.getReplicatedLog().clear(0,2);
290 context.getReplicatedLog().setSnapshotIndex(100);
292 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
294 // The new commitIndex is 101
295 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
297 follower = createBehavior(context);
298 follower.handleMessage(leaderActor, appendEntries);
300 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
301 FollowerInitialSyncUpStatus.class);
302 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
304 assertFalse(syncStatus.isInitialSyncDone());
305 assertTrue("append entries reply should be true", reply.isSuccess());
309 public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
311 "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
313 MockRaftActorContext context = createActorContext();
314 context.getReplicatedLog().clear(0,2);
315 context.getReplicatedLog().setSnapshotIndex(100);
317 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 105, "foo"));
319 // The new commitIndex is 101
320 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
322 follower = createBehavior(context);
323 follower.handleMessage(leaderActor, appendEntries);
325 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
326 FollowerInitialSyncUpStatus.class);
327 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
329 assertFalse(syncStatus.isInitialSyncDone());
330 assertFalse("append entries reply should be false", reply.isSuccess());
334 public void testHandleSyncUpAppendEntries() {
335 logStart("testHandleSyncUpAppendEntries");
337 MockRaftActorContext context = createActorContext();
339 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
341 // The new commitIndex is 101
342 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
344 follower = createBehavior(context);
345 follower.handleMessage(leaderActor, appendEntries);
347 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
348 FollowerInitialSyncUpStatus.class);
350 assertFalse(syncStatus.isInitialSyncDone());
352 // Clear all the messages
353 MessageCollectorActor.clearMessages(followerActor);
355 context.setLastApplied(101);
356 context.setCommitIndex(101);
357 setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
359 entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
361 // The new commitIndex is 101
362 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
363 follower.handleMessage(leaderActor, appendEntries);
365 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
367 assertTrue(syncStatus.isInitialSyncDone());
369 MessageCollectorActor.clearMessages(followerActor);
371 // Sending the same message again should not generate another message
373 follower.handleMessage(leaderActor, appendEntries);
375 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
377 assertNull(syncStatus);
381 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
382 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
384 MockRaftActorContext context = createActorContext();
386 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
388 // The new commitIndex is 101
389 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
391 follower = createBehavior(context);
392 follower.handleMessage(leaderActor, appendEntries);
394 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
395 FollowerInitialSyncUpStatus.class);
397 assertFalse(syncStatus.isInitialSyncDone());
399 // Clear all the messages
400 MessageCollectorActor.clearMessages(followerActor);
402 context.setLastApplied(100);
403 setLastLogEntry(context, 1, 100,
404 new MockRaftActorContext.MockPayload(""));
406 entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
408 // leader-2 is becoming the leader now and it says the commitIndex is 45
409 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
410 follower.handleMessage(leaderActor, appendEntries);
412 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
414 // We get a new message saying initial status is not done
415 assertFalse(syncStatus.isInitialSyncDone());
419 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
420 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
422 MockRaftActorContext context = createActorContext();
424 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
426 // The new commitIndex is 101
427 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
429 follower = createBehavior(context);
430 follower.handleMessage(leaderActor, appendEntries);
432 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
433 FollowerInitialSyncUpStatus.class);
435 assertFalse(syncStatus.isInitialSyncDone());
437 // Clear all the messages
438 MessageCollectorActor.clearMessages(followerActor);
440 context.setLastApplied(101);
441 context.setCommitIndex(101);
442 setLastLogEntry(context, 1, 101,
443 new MockRaftActorContext.MockPayload(""));
445 entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
447 // The new commitIndex is 101
448 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
449 follower.handleMessage(leaderActor, appendEntries);
451 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
453 assertTrue(syncStatus.isInitialSyncDone());
455 // Clear all the messages
456 MessageCollectorActor.clearMessages(followerActor);
458 context.setLastApplied(100);
459 setLastLogEntry(context, 1, 100,
460 new MockRaftActorContext.MockPayload(""));
462 entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
464 // leader-2 is becoming the leader now and it says the commitIndex is 45
465 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
466 follower.handleMessage(leaderActor, appendEntries);
468 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
470 // We get a new message saying initial status is not done
471 assertFalse(syncStatus.isInitialSyncDone());
475 * This test verifies that when an AppendEntries RPC is received by a RaftActor
476 * with a commitIndex that is greater than what has been applied to the
477 * state machine of the RaftActor, the RaftActor applies the state and
478 * sets it current applied state to the commitIndex of the sender.
481 public void testHandleAppendEntriesWithNewerCommitIndex() {
482 logStart("testHandleAppendEntriesWithNewerCommitIndex");
484 MockRaftActorContext context = createActorContext();
486 context.setLastApplied(100);
487 setLastLogEntry(context, 1, 100,
488 new MockRaftActorContext.MockPayload(""));
489 context.getReplicatedLog().setSnapshotIndex(99);
491 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
493 // The new commitIndex is 101
494 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
496 follower = createBehavior(context);
497 follower.handleMessage(leaderActor, appendEntries);
499 assertEquals("getLastApplied", 101L, context.getLastApplied());
503 * This test verifies that when an AppendEntries is received with a prevLogTerm
504 * which does not match the term that is in RaftActors log entry at prevLogIndex
505 * then the RaftActor does not change it's state and it returns a failure.
508 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
509 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
511 MockRaftActorContext context = createActorContext();
513 AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, List.of(), 101, -1, (short)0);
515 follower = createBehavior(context);
517 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
519 assertSame(follower, newBehavior);
521 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
522 AppendEntriesReply.class);
524 assertEquals("isSuccess", false, reply.isSuccess());
528 public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
529 logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
531 MockRaftActorContext context = createActorContext();
532 context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
533 context.getReplicatedLog().setSnapshotIndex(4);
534 context.getReplicatedLog().setSnapshotTerm(3);
536 AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, List.of(), 8, -1, (short)0);
538 follower = createBehavior(context);
540 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
542 assertSame(follower, newBehavior);
544 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
546 assertEquals("isSuccess", true, reply.isSuccess());
550 * This test verifies that when a new AppendEntries message is received with
551 * new entries and the logs of the sender and receiver match that the new
552 * entries get added to the log and the log is incremented by the number of
553 * entries received in appendEntries.
556 public void testHandleAppendEntriesAddNewEntries() {
557 logStart("testHandleAppendEntriesAddNewEntries");
559 MockRaftActorContext context = createActorContext();
561 // First set the receivers term to lower number
562 context.getTermInformation().update(1, "test");
564 // Prepare the receivers log
565 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
566 log.append(newReplicatedLogEntry(1, 0, "zero"));
567 log.append(newReplicatedLogEntry(1, 1, "one"));
568 log.append(newReplicatedLogEntry(1, 2, "two"));
570 context.setReplicatedLog(log);
572 // Prepare the entries to be sent with AppendEntries
573 List<ReplicatedLogEntry> entries = List.of(
574 newReplicatedLogEntry(1, 3, "three"), newReplicatedLogEntry(1, 4, "four"));
576 // Send appendEntries with the same term as was set on the receiver
577 // before the new behavior was created (1 in this case)
578 // This will not work for a Candidate because as soon as a Candidate
579 // is created it increments the term
580 short leaderPayloadVersion = 10;
581 String leaderId = "leader-1";
582 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
584 follower = createBehavior(context);
586 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
588 assertSame(follower, newBehavior);
590 assertEquals("Next index", 5, log.last().getIndex() + 1);
591 assertEquals("Entry 3", entries.get(0), log.get(3));
592 assertEquals("Entry 4", entries.get(1), log.get(4));
594 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
595 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
597 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
601 * This test verifies that when a new AppendEntries message is received with
602 * new entries and the logs of the sender and receiver are out-of-sync that
603 * the log is first corrected by removing the out of sync entries from the
604 * log and then adding in the new entries sent with the AppendEntries message.
607 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
608 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
610 MockRaftActorContext context = createActorContext();
612 // First set the receivers term to lower number
613 context.getTermInformation().update(1, "test");
615 // Prepare the receivers log
616 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
617 log.append(newReplicatedLogEntry(1, 0, "zero"));
618 log.append(newReplicatedLogEntry(1, 1, "one"));
619 log.append(newReplicatedLogEntry(1, 2, "two"));
621 context.setReplicatedLog(log);
623 // Prepare the entries to be sent with AppendEntries
624 List<ReplicatedLogEntry> entries = List.of(
625 newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
627 // Send appendEntries with the same term as was set on the receiver
628 // before the new behavior was created (1 in this case)
629 // This will not work for a Candidate because as soon as a Candidate
630 // is created it increments the term
631 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
633 follower = createBehavior(context);
635 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
637 assertSame(follower, newBehavior);
639 // The entry at index 2 will be found out-of-sync with the leader
640 // and will be removed
641 // Then the two new entries will be added to the log
642 // Thus making the log to have 4 entries
643 assertEquals("Next index", 4, log.last().getIndex() + 1);
644 //assertEquals("Entry 2", entries.get(0), log.get(2));
646 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
648 // Check that the entry at index 2 has the new data
649 assertEquals("Entry 2", entries.get(0), log.get(2));
651 assertEquals("Entry 3", entries.get(1), log.get(3));
653 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
657 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
658 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
660 MockRaftActorContext context = createActorContext();
662 // First set the receivers term to lower number
663 context.getTermInformation().update(1, "test");
665 // Prepare the receivers log
666 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
667 log.append(newReplicatedLogEntry(1, 0, "zero"));
668 log.append(newReplicatedLogEntry(1, 1, "one"));
669 log.append(newReplicatedLogEntry(1, 2, "two"));
671 context.setReplicatedLog(log);
673 // Prepare the entries to be sent with AppendEntries
674 List<ReplicatedLogEntry> entries = List.of(
675 newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
677 // Send appendEntries with the same term as was set on the receiver
678 // before the new behavior was created (1 in this case)
679 // This will not work for a Candidate because as soon as a Candidate
680 // is created it increments the term
681 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
683 context.setRaftPolicy(createRaftPolicy(false, true));
684 follower = createBehavior(context);
686 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
688 assertSame(follower, newBehavior);
690 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
694 public void testHandleAppendEntriesPreviousLogEntryMissing() {
695 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
697 final MockRaftActorContext context = createActorContext();
699 // Prepare the receivers log
700 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
701 log.append(newReplicatedLogEntry(1, 0, "zero"));
702 log.append(newReplicatedLogEntry(1, 1, "one"));
703 log.append(newReplicatedLogEntry(1, 2, "two"));
705 context.setReplicatedLog(log);
707 // Prepare the entries to be sent with AppendEntries
708 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
710 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
712 follower = createBehavior(context);
714 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
716 assertSame(follower, newBehavior);
718 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
722 public void testHandleAppendEntriesWithExistingLogEntry() {
723 logStart("testHandleAppendEntriesWithExistingLogEntry");
725 MockRaftActorContext context = createActorContext();
727 context.getTermInformation().update(1, "test");
729 // Prepare the receivers log
730 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
731 log.append(newReplicatedLogEntry(1, 0, "zero"));
732 log.append(newReplicatedLogEntry(1, 1, "one"));
734 context.setReplicatedLog(log);
736 // Send the last entry again.
737 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 1, "one"));
739 follower = createBehavior(context);
741 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
743 assertEquals("Next index", 2, log.last().getIndex() + 1);
744 assertEquals("Entry 1", entries.get(0), log.get(1));
746 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
748 // Send the last entry again and also a new one.
750 entries = List.of(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
752 MessageCollectorActor.clearMessages(leaderActor);
753 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
755 assertEquals("Next index", 3, log.last().getIndex() + 1);
756 assertEquals("Entry 1", entries.get(0), log.get(1));
757 assertEquals("Entry 2", entries.get(1), log.get(2));
759 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
763 public void testHandleAppendEntriesAfterInstallingSnapshot() {
764 logStart("testHandleAppendAfterInstallingSnapshot");
766 MockRaftActorContext context = createActorContext();
768 // Prepare the receivers log
769 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
771 // Set up a log as if it has been snapshotted
772 log.setSnapshotIndex(3);
773 log.setSnapshotTerm(1);
775 context.setReplicatedLog(log);
777 // Prepare the entries to be sent with AppendEntries
778 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
780 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
782 follower = createBehavior(context);
784 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
786 assertSame(follower, newBehavior);
788 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
792 * This test verifies that when InstallSnapshot is received by
793 * the follower its applied correctly.
796 public void testHandleInstallSnapshot() {
797 logStart("testHandleInstallSnapshot");
799 MockRaftActorContext context = createActorContext();
800 context.getTermInformation().update(1, "leader");
802 follower = createBehavior(context);
804 ByteString bsSnapshot = createSnapshot();
806 int snapshotLength = bsSnapshot.size();
808 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
809 int lastIncludedIndex = 1;
811 InstallSnapshot lastInstallSnapshot = null;
813 for (int i = 0; i < totalChunks; i++) {
814 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
815 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
816 chunkData, chunkIndex, totalChunks);
817 follower.handleMessage(leaderActor, lastInstallSnapshot);
818 offset = offset + 50;
823 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
824 ApplySnapshot.class);
825 Snapshot snapshot = applySnapshot.getSnapshot();
826 assertNotNull(lastInstallSnapshot);
827 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
828 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
829 snapshot.getLastAppliedTerm());
830 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
831 snapshot.getLastAppliedIndex());
832 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
833 assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
834 assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
835 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
836 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
837 applySnapshot.getCallback().onSuccess();
839 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
840 leaderActor, InstallSnapshotReply.class);
841 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
844 for (InstallSnapshotReply reply: replies) {
845 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
846 assertEquals("getTerm", 1, reply.getTerm());
847 assertEquals("isSuccess", true, reply.isSuccess());
848 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
851 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
855 * Verify that when an AppendEntries is sent to a follower during a snapshot install
856 * the Follower short-circuits the processing of the AppendEntries message.
859 public void testReceivingAppendEntriesDuringInstallSnapshot() {
860 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
862 MockRaftActorContext context = createActorContext();
864 follower = createBehavior(context);
866 ByteString bsSnapshot = createSnapshot();
867 int snapshotLength = bsSnapshot.size();
869 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
870 int lastIncludedIndex = 1;
872 // Check that snapshot installation is not in progress
873 assertNull(follower.getSnapshotTracker());
875 // Make sure that we have more than 1 chunk to send
876 assertTrue(totalChunks > 1);
878 // Send an install snapshot with the first chunk to start the process of installing a snapshot
879 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
880 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
881 chunkData, 1, totalChunks));
883 // Check if snapshot installation is in progress now
884 assertNotNull(follower.getSnapshotTracker());
886 // Send an append entry
887 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
888 List.of(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
890 follower.handleMessage(leaderActor, appendEntries);
892 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
893 assertEquals("isSuccess", true, reply.isSuccess());
894 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
895 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
896 assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
898 assertNotNull(follower.getSnapshotTracker());
902 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
903 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
905 MockRaftActorContext context = createActorContext();
907 follower = createBehavior(context);
909 ByteString bsSnapshot = createSnapshot();
910 int snapshotLength = bsSnapshot.size();
912 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
913 int lastIncludedIndex = 1;
915 // Check that snapshot installation is not in progress
916 assertNull(follower.getSnapshotTracker());
918 // Make sure that we have more than 1 chunk to send
919 assertTrue(totalChunks > 1);
921 // Send an install snapshot with the first chunk to start the process of installing a snapshot
922 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
923 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
924 chunkData, 1, totalChunks));
926 // Check if snapshot installation is in progress now
927 assertNotNull(follower.getSnapshotTracker());
929 // Send appendEntries with a new term and leader.
930 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
931 List.of(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
933 follower.handleMessage(leaderActor, appendEntries);
935 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
936 assertEquals("isSuccess", true, reply.isSuccess());
937 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
938 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
939 assertEquals("getTerm", 2, reply.getTerm());
941 assertNull(follower.getSnapshotTracker());
945 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
946 logStart("testInitialSyncUpWithHandleInstallSnapshot");
948 MockRaftActorContext context = createActorContext();
949 context.setCommitIndex(-1);
951 follower = createBehavior(context);
953 ByteString bsSnapshot = createSnapshot();
955 int snapshotLength = bsSnapshot.size();
957 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
958 int lastIncludedIndex = 1;
960 InstallSnapshot lastInstallSnapshot = null;
962 for (int i = 0; i < totalChunks; i++) {
963 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
964 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
965 chunkData, chunkIndex, totalChunks);
966 follower.handleMessage(leaderActor, lastInstallSnapshot);
967 offset = offset + 50;
972 FollowerInitialSyncUpStatus syncStatus =
973 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
975 assertFalse(syncStatus.isInitialSyncDone());
977 // Clear all the messages
978 MessageCollectorActor.clearMessages(followerActor);
980 context.setLastApplied(101);
981 context.setCommitIndex(101);
982 setLastLogEntry(context, 1, 101,
983 new MockRaftActorContext.MockPayload(""));
985 List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
987 // The new commitIndex is 101
988 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
989 follower.handleMessage(leaderActor, appendEntries);
991 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
993 assertTrue(syncStatus.isInitialSyncDone());
997 public void testHandleOutOfSequenceInstallSnapshot() {
998 logStart("testHandleOutOfSequenceInstallSnapshot");
1000 MockRaftActorContext context = createActorContext();
1002 follower = createBehavior(context);
1004 ByteString bsSnapshot = createSnapshot();
1006 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1007 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1008 follower.handleMessage(leaderActor, installSnapshot);
1010 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1011 InstallSnapshotReply.class);
1013 assertEquals("isSuccess", false, reply.isSuccess());
1014 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1015 assertEquals("getTerm", 1, reply.getTerm());
1016 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1018 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1022 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1023 MockRaftActorContext context = createActorContext();
1025 Stopwatch stopwatch = Stopwatch.createStarted();
1027 follower = createBehavior(context);
1029 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1031 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1033 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1035 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1036 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1040 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1041 MockRaftActorContext context = createActorContext();
1042 context.setConfigParams(new DefaultConfigParamsImpl() {
1044 public FiniteDuration getElectionTimeOutInterval() {
1045 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1049 context.setRaftPolicy(createRaftPolicy(false, false));
1051 follower = createBehavior(context);
1053 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1054 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1055 assertSame("handleMessage result", follower, newBehavior);
1059 public void testFollowerSchedulesElectionIfNonVoting() {
1060 MockRaftActorContext context = createActorContext();
1061 context.updatePeerIds(new ServerConfigurationPayload(List.of(new ServerInfo(context.getId(), false))));
1062 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1063 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1064 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1066 follower = new Follower(context, "leader", (short)1);
1068 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1069 ElectionTimeout.class);
1070 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1071 assertSame("handleMessage result", follower, newBehavior);
1072 assertNull("Expected null leaderId", follower.getLeaderId());
1076 public void testElectionScheduledWhenAnyRaftRPCReceived() {
1077 MockRaftActorContext context = createActorContext();
1078 follower = createBehavior(context);
1079 follower.handleMessage(leaderActor, new RaftRPC() {
1080 private static final long serialVersionUID = 1L;
1083 public long getTerm() {
1087 verify(follower).scheduleElection(any(FiniteDuration.class));
1091 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1092 MockRaftActorContext context = createActorContext();
1093 follower = createBehavior(context);
1094 follower.handleMessage(leaderActor, "non-raft-rpc");
1095 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1099 public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1100 String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1103 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1105 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1106 config.setSnapshotBatchCount(2);
1107 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1109 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1110 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1111 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1112 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1113 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1114 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1115 followerRaftActor.set(followerActorRef.underlyingActor());
1116 followerRaftActor.get().waitForInitializeBehaviorComplete();
1118 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1119 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1120 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1122 List<ReplicatedLogEntry> entries = List.of(
1123 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1125 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1127 followerActorRef.tell(appendEntries, leaderActor);
1129 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1130 assertEquals("isSuccess", true, reply.isSuccess());
1132 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1134 InMemoryJournal.waitForDeleteMessagesComplete(id);
1135 InMemoryJournal.waitForWriteMessagesComplete(id);
1136 // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1137 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1138 // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1139 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1140 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1141 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1142 assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1144 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1145 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1146 assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1147 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1148 assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1149 assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData()),
1150 MockRaftActor.fromState(snapshot.getState()));
1154 public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1155 String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1158 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1160 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1161 config.setSnapshotBatchCount(2);
1162 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1164 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1165 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1166 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1167 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1168 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1169 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1170 followerRaftActor.set(followerActorRef.underlyingActor());
1171 followerRaftActor.get().waitForInitializeBehaviorComplete();
1173 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1174 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1175 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1177 List<ReplicatedLogEntry> entries = List.of(
1178 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1179 newReplicatedLogEntry(1, 2, "three"));
1181 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1183 followerActorRef.tell(appendEntries, leaderActor);
1185 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1186 assertEquals("isSuccess", true, reply.isSuccess());
1188 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1190 InMemoryJournal.waitForDeleteMessagesComplete(id);
1191 InMemoryJournal.waitForWriteMessagesComplete(id);
1192 // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1193 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1194 // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1195 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1196 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1197 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1198 assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1200 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1201 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1202 assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1203 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1204 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1205 assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData(),
1206 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1208 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1209 assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1211 // Reinstate the actor from persistence
1213 actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1215 followerActorRef = actorFactory.createTestActor(builder.props()
1216 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1217 followerRaftActor.set(followerActorRef.underlyingActor());
1218 followerRaftActor.get().waitForInitializeBehaviorComplete();
1220 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1221 assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1222 assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1223 assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1224 assertEquals("State", List.of(entries.get(0).getData(), entries.get(1).getData(),
1225 entries.get(2).getData()), followerRaftActor.get().getState());
1229 public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1230 String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1233 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1235 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1236 config.setSnapshotBatchCount(1);
1237 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1239 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1240 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1241 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1242 .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1243 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1244 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1245 followerRaftActor.set(followerActorRef.underlyingActor());
1246 followerRaftActor.get().waitForInitializeBehaviorComplete();
1248 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1249 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1250 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1252 List<ReplicatedLogEntry> entries = List.of(
1253 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1254 newReplicatedLogEntry(1, 2, "three"));
1256 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1258 followerActorRef.tell(appendEntries, leaderActor);
1260 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1261 assertEquals("isSuccess", true, reply.isSuccess());
1263 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1265 InMemoryJournal.waitForDeleteMessagesComplete(id);
1266 InMemoryJournal.waitForWriteMessagesComplete(id);
1267 // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1268 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1269 // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1270 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1271 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1272 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1273 assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1275 assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1276 assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1277 assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1278 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1279 assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1280 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1281 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1282 assertEquals("Snapshot state", List.of(entries.get(0).getData()),
1283 MockRaftActor.fromState(snapshot.getState()));
1287 public void testNeedsLeaderAddress() {
1288 logStart("testNeedsLeaderAddress");
1290 MockRaftActorContext context = createActorContext();
1291 context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
1292 context.addToPeers("leader", null, VotingState.VOTING);
1293 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
1295 follower = createBehavior(context);
1297 follower.handleMessage(leaderActor,
1298 new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, (short)0));
1300 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1301 assertTrue(reply.isNeedsLeaderAddress());
1302 MessageCollectorActor.clearMessages(leaderActor);
1304 PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
1305 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
1307 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1,
1308 (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
1310 reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1311 assertFalse(reply.isNeedsLeaderAddress());
1313 verify(mockResolver).setResolved("leader", leaderActor.path().toString());
1316 @SuppressWarnings("checkstyle:IllegalCatch")
1317 private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1318 final AtomicReference<MockRaftActor> followerRaftActor) {
1319 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1321 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1323 actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1324 installSnapshotStream), actorRef);
1325 } catch (RuntimeException e) {
1327 } catch (Exception e) {
1328 throw new RuntimeException(e);
1333 public void applySnapshot(final State snapshotState) {
1337 public State deserializeSnapshot(final ByteSource snapshotBytes) {
1338 throw new UnsupportedOperationException();
1341 return snapshotCohort;
1344 public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1345 int snapshotLength = bs.size();
1347 int size = chunkSize;
1348 if (chunkSize > snapshotLength) {
1349 size = snapshotLength;
1350 } else if (start + chunkSize > snapshotLength) {
1351 size = snapshotLength - start;
1354 byte[] nextChunk = new byte[size];
1355 bs.copyTo(nextChunk, start, 0, size);
1359 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1360 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1361 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1364 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1365 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1366 final boolean expForceInstallSnapshot) {
1368 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1369 AppendEntriesReply.class);
1371 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1372 assertEquals("getTerm", expTerm, reply.getTerm());
1373 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1374 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1375 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1376 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1377 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1378 assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
1382 private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1383 return new SimpleReplicatedLogEntry(index, term,
1384 new MockRaftActorContext.MockPayload(data));
1387 private ByteString createSnapshot() {
1388 return toByteString(Map.of("1", "A", "2", "B", "3", "C"));
1392 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1393 final ActorRef actorRef, final RaftRPC rpc) {
1394 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1396 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1397 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1401 protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1402 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1403 assertEquals("isSuccess", true, reply.isSuccess());