2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.dispatch.Dispatchers;
24 import akka.protobuf.ByteString;
25 import akka.testkit.TestActorRef;
26 import akka.testkit.javadsl.TestKit;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Stopwatch;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.io.ByteSource;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.io.OutputStream;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.After;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.MockRaftActor;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
48 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
49 import org.opendaylight.controller.cluster.raft.RaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
55 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
56 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
63 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
64 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
65 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
68 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
70 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
71 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
72 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
73 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
74 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import scala.concurrent.duration.FiniteDuration;
78 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
80 private final ActorRef followerActor = actorFactory.createActor(
81 MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
83 private final ActorRef leaderActor = actorFactory.createActor(
84 MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
86 private Follower follower;
88 private final short payloadVersion = 5;
92 public void tearDown() {
93 if (follower != null) {
101 protected Follower createBehavior(final RaftActorContext actorContext) {
102 return spy(new Follower(actorContext));
106 protected MockRaftActorContext createActorContext() {
107 return createActorContext(followerActor);
111 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
112 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
113 context.setPayloadVersion(payloadVersion);
118 public void testThatAnElectionTimeoutIsTriggered() {
119 MockRaftActorContext actorContext = createActorContext();
120 follower = new Follower(actorContext);
122 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
123 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
127 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
128 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
130 MockRaftActorContext context = createActorContext();
131 follower = new Follower(context);
133 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
134 TimeUnit.MILLISECONDS);
135 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
137 assertTrue(raftBehavior instanceof Candidate);
141 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
142 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
144 MockRaftActorContext context = createActorContext();
145 ((DefaultConfigParamsImpl) context.getConfigParams())
146 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
147 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
149 follower = new Follower(context);
150 context.setCurrentBehavior(follower);
152 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
153 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
154 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
157 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
158 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
159 assertTrue(raftBehavior instanceof Follower);
161 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
162 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
163 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
166 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
167 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
168 assertTrue(raftBehavior instanceof Follower);
172 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
173 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
175 MockRaftActorContext context = createActorContext();
177 context.getTermInformation().update(term, null);
179 follower = createBehavior(context);
181 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
183 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
185 assertEquals("isVoteGranted", true, reply.isVoteGranted());
186 assertEquals("getTerm", term, reply.getTerm());
187 verify(follower).scheduleElection(any(FiniteDuration.class));
191 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
192 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
194 MockRaftActorContext context = createActorContext();
196 context.getTermInformation().update(term, "test");
198 follower = createBehavior(context);
200 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
202 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
204 assertEquals("isVoteGranted", false, reply.isVoteGranted());
205 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
210 public void testHandleFirstAppendEntries() {
211 logStart("testHandleFirstAppendEntries");
213 MockRaftActorContext context = createActorContext();
214 context.getReplicatedLog().clear(0,2);
215 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
216 context.getReplicatedLog().setSnapshotIndex(99);
218 List<ReplicatedLogEntry> entries = Arrays.asList(
219 newReplicatedLogEntry(2, 101, "foo"));
221 Assert.assertEquals(1, context.getReplicatedLog().size());
223 // The new commitIndex is 101
224 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
226 follower = createBehavior(context);
227 follower.handleMessage(leaderActor, appendEntries);
229 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
230 FollowerInitialSyncUpStatus.class);
231 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
233 assertFalse(syncStatus.isInitialSyncDone());
234 assertTrue("append entries reply should be true", reply.isSuccess());
238 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
239 logStart("testHandleFirstAppendEntries");
241 MockRaftActorContext context = createActorContext();
243 List<ReplicatedLogEntry> entries = Arrays.asList(
244 newReplicatedLogEntry(2, 101, "foo"));
246 // The new commitIndex is 101
247 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
249 follower = createBehavior(context);
250 follower.handleMessage(leaderActor, appendEntries);
252 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
253 FollowerInitialSyncUpStatus.class);
254 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
256 assertFalse(syncStatus.isInitialSyncDone());
257 assertFalse("append entries reply should be false", reply.isSuccess());
261 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
262 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
264 MockRaftActorContext context = createActorContext();
265 context.getReplicatedLog().clear(0,2);
266 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
267 context.getReplicatedLog().setSnapshotIndex(99);
269 List<ReplicatedLogEntry> entries = Arrays.asList(
270 newReplicatedLogEntry(2, 101, "foo"));
272 // The new commitIndex is 101
273 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
275 follower = createBehavior(context);
276 follower.handleMessage(leaderActor, appendEntries);
278 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
279 FollowerInitialSyncUpStatus.class);
280 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
282 assertFalse(syncStatus.isInitialSyncDone());
283 assertTrue("append entries reply should be true", reply.isSuccess());
287 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
288 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
290 MockRaftActorContext context = createActorContext();
291 context.getReplicatedLog().clear(0,2);
292 context.getReplicatedLog().setSnapshotIndex(100);
294 List<ReplicatedLogEntry> entries = Arrays.asList(
295 newReplicatedLogEntry(2, 101, "foo"));
297 // The new commitIndex is 101
298 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
300 follower = createBehavior(context);
301 follower.handleMessage(leaderActor, appendEntries);
303 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
304 FollowerInitialSyncUpStatus.class);
305 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
307 assertFalse(syncStatus.isInitialSyncDone());
308 assertTrue("append entries reply should be true", reply.isSuccess());
312 public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
314 "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
316 MockRaftActorContext context = createActorContext();
317 context.getReplicatedLog().clear(0,2);
318 context.getReplicatedLog().setSnapshotIndex(100);
320 List<ReplicatedLogEntry> entries = Arrays.asList(
321 newReplicatedLogEntry(2, 105, "foo"));
323 // The new commitIndex is 101
324 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
326 follower = createBehavior(context);
327 follower.handleMessage(leaderActor, appendEntries);
329 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
330 FollowerInitialSyncUpStatus.class);
331 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
333 assertFalse(syncStatus.isInitialSyncDone());
334 assertFalse("append entries reply should be false", reply.isSuccess());
338 public void testHandleSyncUpAppendEntries() {
339 logStart("testHandleSyncUpAppendEntries");
341 MockRaftActorContext context = createActorContext();
343 List<ReplicatedLogEntry> entries = Arrays.asList(
344 newReplicatedLogEntry(2, 101, "foo"));
346 // The new commitIndex is 101
347 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
349 follower = createBehavior(context);
350 follower.handleMessage(leaderActor, appendEntries);
352 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
353 FollowerInitialSyncUpStatus.class);
355 assertFalse(syncStatus.isInitialSyncDone());
357 // Clear all the messages
358 MessageCollectorActor.clearMessages(followerActor);
360 context.setLastApplied(101);
361 context.setCommitIndex(101);
362 setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
364 entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
366 // The new commitIndex is 101
367 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
368 follower.handleMessage(leaderActor, appendEntries);
370 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
372 assertTrue(syncStatus.isInitialSyncDone());
374 MessageCollectorActor.clearMessages(followerActor);
376 // Sending the same message again should not generate another message
378 follower.handleMessage(leaderActor, appendEntries);
380 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
382 assertNull(syncStatus);
386 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
387 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
389 MockRaftActorContext context = createActorContext();
391 List<ReplicatedLogEntry> entries = Arrays.asList(
392 newReplicatedLogEntry(2, 101, "foo"));
394 // The new commitIndex is 101
395 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
397 follower = createBehavior(context);
398 follower.handleMessage(leaderActor, appendEntries);
400 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
401 FollowerInitialSyncUpStatus.class);
403 assertFalse(syncStatus.isInitialSyncDone());
405 // Clear all the messages
406 MessageCollectorActor.clearMessages(followerActor);
408 context.setLastApplied(100);
409 setLastLogEntry(context, 1, 100,
410 new MockRaftActorContext.MockPayload(""));
412 entries = Arrays.asList(
413 newReplicatedLogEntry(2, 101, "foo"));
415 // leader-2 is becoming the leader now and it says the commitIndex is 45
416 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
417 follower.handleMessage(leaderActor, appendEntries);
419 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
421 // We get a new message saying initial status is not done
422 assertFalse(syncStatus.isInitialSyncDone());
426 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
427 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
429 MockRaftActorContext context = createActorContext();
431 List<ReplicatedLogEntry> entries = Arrays.asList(
432 newReplicatedLogEntry(2, 101, "foo"));
434 // The new commitIndex is 101
435 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
437 follower = createBehavior(context);
438 follower.handleMessage(leaderActor, appendEntries);
440 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
441 FollowerInitialSyncUpStatus.class);
443 assertFalse(syncStatus.isInitialSyncDone());
445 // Clear all the messages
446 MessageCollectorActor.clearMessages(followerActor);
448 context.setLastApplied(101);
449 context.setCommitIndex(101);
450 setLastLogEntry(context, 1, 101,
451 new MockRaftActorContext.MockPayload(""));
453 entries = Arrays.asList(
454 newReplicatedLogEntry(2, 101, "foo"));
456 // The new commitIndex is 101
457 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
458 follower.handleMessage(leaderActor, appendEntries);
460 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
462 assertTrue(syncStatus.isInitialSyncDone());
464 // Clear all the messages
465 MessageCollectorActor.clearMessages(followerActor);
467 context.setLastApplied(100);
468 setLastLogEntry(context, 1, 100,
469 new MockRaftActorContext.MockPayload(""));
471 entries = Arrays.asList(
472 newReplicatedLogEntry(2, 101, "foo"));
474 // leader-2 is becoming the leader now and it says the commitIndex is 45
475 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
476 follower.handleMessage(leaderActor, appendEntries);
478 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
480 // We get a new message saying initial status is not done
481 assertFalse(syncStatus.isInitialSyncDone());
485 * This test verifies that when an AppendEntries RPC is received by a RaftActor
486 * with a commitIndex that is greater than what has been applied to the
487 * state machine of the RaftActor, the RaftActor applies the state and
488 * sets it current applied state to the commitIndex of the sender.
491 public void testHandleAppendEntriesWithNewerCommitIndex() {
492 logStart("testHandleAppendEntriesWithNewerCommitIndex");
494 MockRaftActorContext context = createActorContext();
496 context.setLastApplied(100);
497 setLastLogEntry(context, 1, 100,
498 new MockRaftActorContext.MockPayload(""));
499 context.getReplicatedLog().setSnapshotIndex(99);
501 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
502 newReplicatedLogEntry(2, 101, "foo"));
504 // The new commitIndex is 101
505 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
507 follower = createBehavior(context);
508 follower.handleMessage(leaderActor, appendEntries);
510 assertEquals("getLastApplied", 101L, context.getLastApplied());
514 * This test verifies that when an AppendEntries is received with a prevLogTerm
515 * which does not match the term that is in RaftActors log entry at prevLogIndex
516 * then the RaftActor does not change it's state and it returns a failure.
519 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
520 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
522 MockRaftActorContext context = createActorContext();
524 AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
526 follower = createBehavior(context);
528 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
530 Assert.assertSame(follower, newBehavior);
532 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
533 AppendEntriesReply.class);
535 assertEquals("isSuccess", false, reply.isSuccess());
539 public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
540 logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
542 MockRaftActorContext context = createActorContext();
543 context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
544 context.getReplicatedLog().setSnapshotIndex(4);
545 context.getReplicatedLog().setSnapshotTerm(3);
547 AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
549 follower = createBehavior(context);
551 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
553 Assert.assertSame(follower, newBehavior);
555 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
557 assertEquals("isSuccess", true, reply.isSuccess());
561 * This test verifies that when a new AppendEntries message is received with
562 * new entries and the logs of the sender and receiver match that the new
563 * entries get added to the log and the log is incremented by the number of
564 * entries received in appendEntries.
567 public void testHandleAppendEntriesAddNewEntries() {
568 logStart("testHandleAppendEntriesAddNewEntries");
570 MockRaftActorContext context = createActorContext();
572 // First set the receivers term to lower number
573 context.getTermInformation().update(1, "test");
575 // Prepare the receivers log
576 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
577 log.append(newReplicatedLogEntry(1, 0, "zero"));
578 log.append(newReplicatedLogEntry(1, 1, "one"));
579 log.append(newReplicatedLogEntry(1, 2, "two"));
581 context.setReplicatedLog(log);
583 // Prepare the entries to be sent with AppendEntries
584 List<ReplicatedLogEntry> entries = new ArrayList<>();
585 entries.add(newReplicatedLogEntry(1, 3, "three"));
586 entries.add(newReplicatedLogEntry(1, 4, "four"));
588 // Send appendEntries with the same term as was set on the receiver
589 // before the new behavior was created (1 in this case)
590 // This will not work for a Candidate because as soon as a Candidate
591 // is created it increments the term
592 short leaderPayloadVersion = 10;
593 String leaderId = "leader-1";
594 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
596 follower = createBehavior(context);
598 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
600 Assert.assertSame(follower, newBehavior);
602 assertEquals("Next index", 5, log.last().getIndex() + 1);
603 assertEquals("Entry 3", entries.get(0), log.get(3));
604 assertEquals("Entry 4", entries.get(1), log.get(4));
606 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
607 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
609 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
613 * This test verifies that when a new AppendEntries message is received with
614 * new entries and the logs of the sender and receiver are out-of-sync that
615 * the log is first corrected by removing the out of sync entries from the
616 * log and then adding in the new entries sent with the AppendEntries message.
619 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
620 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
622 MockRaftActorContext context = createActorContext();
624 // First set the receivers term to lower number
625 context.getTermInformation().update(1, "test");
627 // Prepare the receivers log
628 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
629 log.append(newReplicatedLogEntry(1, 0, "zero"));
630 log.append(newReplicatedLogEntry(1, 1, "one"));
631 log.append(newReplicatedLogEntry(1, 2, "two"));
633 context.setReplicatedLog(log);
635 // Prepare the entries to be sent with AppendEntries
636 List<ReplicatedLogEntry> entries = new ArrayList<>();
637 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
638 entries.add(newReplicatedLogEntry(2, 3, "three"));
640 // Send appendEntries with the same term as was set on the receiver
641 // before the new behavior was created (1 in this case)
642 // This will not work for a Candidate because as soon as a Candidate
643 // is created it increments the term
644 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
646 follower = createBehavior(context);
648 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
650 Assert.assertSame(follower, newBehavior);
652 // The entry at index 2 will be found out-of-sync with the leader
653 // and will be removed
654 // Then the two new entries will be added to the log
655 // Thus making the log to have 4 entries
656 assertEquals("Next index", 4, log.last().getIndex() + 1);
657 //assertEquals("Entry 2", entries.get(0), log.get(2));
659 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
661 // Check that the entry at index 2 has the new data
662 assertEquals("Entry 2", entries.get(0), log.get(2));
664 assertEquals("Entry 3", entries.get(1), log.get(3));
666 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
670 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
671 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
673 MockRaftActorContext context = createActorContext();
675 // First set the receivers term to lower number
676 context.getTermInformation().update(1, "test");
678 // Prepare the receivers log
679 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
680 log.append(newReplicatedLogEntry(1, 0, "zero"));
681 log.append(newReplicatedLogEntry(1, 1, "one"));
682 log.append(newReplicatedLogEntry(1, 2, "two"));
684 context.setReplicatedLog(log);
686 // Prepare the entries to be sent with AppendEntries
687 List<ReplicatedLogEntry> entries = new ArrayList<>();
688 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
689 entries.add(newReplicatedLogEntry(2, 3, "three"));
691 // Send appendEntries with the same term as was set on the receiver
692 // before the new behavior was created (1 in this case)
693 // This will not work for a Candidate because as soon as a Candidate
694 // is created it increments the term
695 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
697 context.setRaftPolicy(createRaftPolicy(false, true));
698 follower = createBehavior(context);
700 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
702 Assert.assertSame(follower, newBehavior);
704 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
708 public void testHandleAppendEntriesPreviousLogEntryMissing() {
709 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
711 final MockRaftActorContext context = createActorContext();
713 // Prepare the receivers log
714 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
715 log.append(newReplicatedLogEntry(1, 0, "zero"));
716 log.append(newReplicatedLogEntry(1, 1, "one"));
717 log.append(newReplicatedLogEntry(1, 2, "two"));
719 context.setReplicatedLog(log);
721 // Prepare the entries to be sent with AppendEntries
722 List<ReplicatedLogEntry> entries = new ArrayList<>();
723 entries.add(newReplicatedLogEntry(1, 4, "four"));
725 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
727 follower = createBehavior(context);
729 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
731 Assert.assertSame(follower, newBehavior);
733 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
737 public void testHandleAppendEntriesWithExistingLogEntry() {
738 logStart("testHandleAppendEntriesWithExistingLogEntry");
740 MockRaftActorContext context = createActorContext();
742 context.getTermInformation().update(1, "test");
744 // Prepare the receivers log
745 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
746 log.append(newReplicatedLogEntry(1, 0, "zero"));
747 log.append(newReplicatedLogEntry(1, 1, "one"));
749 context.setReplicatedLog(log);
751 // Send the last entry again.
752 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
754 follower = createBehavior(context);
756 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
758 assertEquals("Next index", 2, log.last().getIndex() + 1);
759 assertEquals("Entry 1", entries.get(0), log.get(1));
761 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
763 // Send the last entry again and also a new one.
765 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
767 MessageCollectorActor.clearMessages(leaderActor);
768 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
770 assertEquals("Next index", 3, log.last().getIndex() + 1);
771 assertEquals("Entry 1", entries.get(0), log.get(1));
772 assertEquals("Entry 2", entries.get(1), log.get(2));
774 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
778 public void testHandleAppendEntriesAfterInstallingSnapshot() {
779 logStart("testHandleAppendAfterInstallingSnapshot");
781 MockRaftActorContext context = createActorContext();
783 // Prepare the receivers log
784 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
786 // Set up a log as if it has been snapshotted
787 log.setSnapshotIndex(3);
788 log.setSnapshotTerm(1);
790 context.setReplicatedLog(log);
792 // Prepare the entries to be sent with AppendEntries
793 List<ReplicatedLogEntry> entries = new ArrayList<>();
794 entries.add(newReplicatedLogEntry(1, 4, "four"));
796 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
798 follower = createBehavior(context);
800 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
802 Assert.assertSame(follower, newBehavior);
804 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
809 * This test verifies that when InstallSnapshot is received by
810 * the follower its applied correctly.
813 public void testHandleInstallSnapshot() {
814 logStart("testHandleInstallSnapshot");
816 MockRaftActorContext context = createActorContext();
817 context.getTermInformation().update(1, "leader");
819 follower = createBehavior(context);
821 ByteString bsSnapshot = createSnapshot();
823 int snapshotLength = bsSnapshot.size();
825 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
826 int lastIncludedIndex = 1;
828 InstallSnapshot lastInstallSnapshot = null;
830 for (int i = 0; i < totalChunks; i++) {
831 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
832 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
833 chunkData, chunkIndex, totalChunks);
834 follower.handleMessage(leaderActor, lastInstallSnapshot);
835 offset = offset + 50;
840 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
841 ApplySnapshot.class);
842 Snapshot snapshot = applySnapshot.getSnapshot();
843 assertNotNull(lastInstallSnapshot);
844 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
845 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
846 snapshot.getLastAppliedTerm());
847 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
848 snapshot.getLastAppliedIndex());
849 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
850 assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
851 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
852 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
853 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
854 applySnapshot.getCallback().onSuccess();
856 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
857 leaderActor, InstallSnapshotReply.class);
858 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
861 for (InstallSnapshotReply reply: replies) {
862 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
863 assertEquals("getTerm", 1, reply.getTerm());
864 assertEquals("isSuccess", true, reply.isSuccess());
865 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
868 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
872 * Verify that when an AppendEntries is sent to a follower during a snapshot install
873 * the Follower short-circuits the processing of the AppendEntries message.
876 public void testReceivingAppendEntriesDuringInstallSnapshot() {
877 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
879 MockRaftActorContext context = createActorContext();
881 follower = createBehavior(context);
883 ByteString bsSnapshot = createSnapshot();
884 int snapshotLength = bsSnapshot.size();
886 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
887 int lastIncludedIndex = 1;
889 // Check that snapshot installation is not in progress
890 assertNull(follower.getSnapshotTracker());
892 // Make sure that we have more than 1 chunk to send
893 assertTrue(totalChunks > 1);
895 // Send an install snapshot with the first chunk to start the process of installing a snapshot
896 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
897 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
898 chunkData, 1, totalChunks));
900 // Check if snapshot installation is in progress now
901 assertNotNull(follower.getSnapshotTracker());
903 // Send an append entry
904 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
905 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
907 follower.handleMessage(leaderActor, appendEntries);
909 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
910 assertEquals("isSuccess", true, reply.isSuccess());
911 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
912 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
913 assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
915 assertNotNull(follower.getSnapshotTracker());
919 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
920 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
922 MockRaftActorContext context = createActorContext();
924 follower = createBehavior(context);
926 ByteString bsSnapshot = createSnapshot();
927 int snapshotLength = bsSnapshot.size();
929 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
930 int lastIncludedIndex = 1;
932 // Check that snapshot installation is not in progress
933 assertNull(follower.getSnapshotTracker());
935 // Make sure that we have more than 1 chunk to send
936 assertTrue(totalChunks > 1);
938 // Send an install snapshot with the first chunk to start the process of installing a snapshot
939 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
940 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
941 chunkData, 1, totalChunks));
943 // Check if snapshot installation is in progress now
944 assertNotNull(follower.getSnapshotTracker());
946 // Send appendEntries with a new term and leader.
947 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
948 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
950 follower.handleMessage(leaderActor, appendEntries);
952 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
953 assertEquals("isSuccess", true, reply.isSuccess());
954 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
955 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
956 assertEquals("getTerm", 2, reply.getTerm());
958 assertNull(follower.getSnapshotTracker());
962 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
963 logStart("testInitialSyncUpWithHandleInstallSnapshot");
965 MockRaftActorContext context = createActorContext();
966 context.setCommitIndex(-1);
968 follower = createBehavior(context);
970 ByteString bsSnapshot = createSnapshot();
972 int snapshotLength = bsSnapshot.size();
974 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
975 int lastIncludedIndex = 1;
977 InstallSnapshot lastInstallSnapshot = null;
979 for (int i = 0; i < totalChunks; i++) {
980 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
981 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
982 chunkData, chunkIndex, totalChunks);
983 follower.handleMessage(leaderActor, lastInstallSnapshot);
984 offset = offset + 50;
989 FollowerInitialSyncUpStatus syncStatus =
990 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
992 assertFalse(syncStatus.isInitialSyncDone());
994 // Clear all the messages
995 MessageCollectorActor.clearMessages(followerActor);
997 context.setLastApplied(101);
998 context.setCommitIndex(101);
999 setLastLogEntry(context, 1, 101,
1000 new MockRaftActorContext.MockPayload(""));
1002 List<ReplicatedLogEntry> entries = Arrays.asList(
1003 newReplicatedLogEntry(2, 101, "foo"));
1005 // The new commitIndex is 101
1006 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
1007 follower.handleMessage(leaderActor, appendEntries);
1009 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
1011 assertTrue(syncStatus.isInitialSyncDone());
1015 public void testHandleOutOfSequenceInstallSnapshot() {
1016 logStart("testHandleOutOfSequenceInstallSnapshot");
1018 MockRaftActorContext context = createActorContext();
1020 follower = createBehavior(context);
1022 ByteString bsSnapshot = createSnapshot();
1024 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1025 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1026 follower.handleMessage(leaderActor, installSnapshot);
1028 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1029 InstallSnapshotReply.class);
1031 assertEquals("isSuccess", false, reply.isSuccess());
1032 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1033 assertEquals("getTerm", 1, reply.getTerm());
1034 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1036 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1040 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1041 MockRaftActorContext context = createActorContext();
1043 Stopwatch stopwatch = Stopwatch.createStarted();
1045 follower = createBehavior(context);
1047 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1049 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1051 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1053 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1054 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1058 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1059 MockRaftActorContext context = createActorContext();
1060 context.setConfigParams(new DefaultConfigParamsImpl() {
1062 public FiniteDuration getElectionTimeOutInterval() {
1063 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1067 context.setRaftPolicy(createRaftPolicy(false, false));
1069 follower = createBehavior(context);
1071 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1072 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1073 assertSame("handleMessage result", follower, newBehavior);
1077 public void testFollowerSchedulesElectionIfNonVoting() {
1078 MockRaftActorContext context = createActorContext();
1079 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1080 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1081 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1082 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1084 follower = new Follower(context, "leader", (short)1);
1086 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1087 ElectionTimeout.class);
1088 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1089 assertSame("handleMessage result", follower, newBehavior);
1090 assertNull("Expected null leaderId", follower.getLeaderId());
1094 public void testElectionScheduledWhenAnyRaftRPCReceived() {
1095 MockRaftActorContext context = createActorContext();
1096 follower = createBehavior(context);
1097 follower.handleMessage(leaderActor, new RaftRPC() {
1098 private static final long serialVersionUID = 1L;
1101 public long getTerm() {
1105 verify(follower).scheduleElection(any(FiniteDuration.class));
1109 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1110 MockRaftActorContext context = createActorContext();
1111 follower = createBehavior(context);
1112 follower.handleMessage(leaderActor, "non-raft-rpc");
1113 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1117 public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1118 String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1121 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1123 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1124 config.setSnapshotBatchCount(2);
1125 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1127 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1128 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1129 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1130 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1131 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1132 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1133 followerRaftActor.set(followerActorRef.underlyingActor());
1134 followerRaftActor.get().waitForInitializeBehaviorComplete();
1136 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1137 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1138 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1140 List<ReplicatedLogEntry> entries = Arrays.asList(
1141 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1143 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1145 followerActorRef.tell(appendEntries, leaderActor);
1147 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1148 assertEquals("isSuccess", true, reply.isSuccess());
1150 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1152 InMemoryJournal.waitForDeleteMessagesComplete(id);
1153 InMemoryJournal.waitForWriteMessagesComplete(id);
1154 // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1155 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1156 // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1157 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1158 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1159 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1160 assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1162 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1163 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1164 assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1165 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1166 assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1167 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1168 MockRaftActor.fromState(snapshot.getState()));
1172 public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1173 String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1176 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1178 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1179 config.setSnapshotBatchCount(2);
1180 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1182 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1183 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1184 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1185 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1186 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1187 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1188 followerRaftActor.set(followerActorRef.underlyingActor());
1189 followerRaftActor.get().waitForInitializeBehaviorComplete();
1191 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1192 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1193 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1195 List<ReplicatedLogEntry> entries = Arrays.asList(
1196 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1197 newReplicatedLogEntry(1, 2, "three"));
1199 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1201 followerActorRef.tell(appendEntries, leaderActor);
1203 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1204 assertEquals("isSuccess", true, reply.isSuccess());
1206 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1208 InMemoryJournal.waitForDeleteMessagesComplete(id);
1209 InMemoryJournal.waitForWriteMessagesComplete(id);
1210 // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1211 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1212 // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1213 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1214 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1215 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1216 assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1218 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1219 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1220 assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1221 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1222 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1223 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1224 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1226 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1227 assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1229 // Reinstate the actor from persistence
1231 actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1233 followerActorRef = actorFactory.createTestActor(builder.props()
1234 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1235 followerRaftActor.set(followerActorRef.underlyingActor());
1236 followerRaftActor.get().waitForInitializeBehaviorComplete();
1238 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1239 assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1240 assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1241 assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1242 assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1243 entries.get(2).getData()), followerRaftActor.get().getState());
1247 public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1248 String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1251 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1253 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1254 config.setSnapshotBatchCount(1);
1255 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1257 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1258 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1259 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1260 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1261 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1262 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1263 followerRaftActor.set(followerActorRef.underlyingActor());
1264 followerRaftActor.get().waitForInitializeBehaviorComplete();
1266 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1267 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1268 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1270 List<ReplicatedLogEntry> entries = Arrays.asList(
1271 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1272 newReplicatedLogEntry(1, 2, "three"));
1274 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1276 followerActorRef.tell(appendEntries, leaderActor);
1278 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1279 assertEquals("isSuccess", true, reply.isSuccess());
1281 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1283 InMemoryJournal.waitForDeleteMessagesComplete(id);
1284 InMemoryJournal.waitForWriteMessagesComplete(id);
1285 // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1286 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1287 // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1288 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1289 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1290 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1291 assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1293 assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1294 assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1295 assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1296 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1297 assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1298 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1299 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1300 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1301 MockRaftActor.fromState(snapshot.getState()));
1304 @SuppressWarnings("checkstyle:IllegalCatch")
1305 private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1306 final AtomicReference<MockRaftActor> followerRaftActor) {
1307 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1309 public void createSnapshot(final ActorRef actorRef,
1310 final java.util.Optional<OutputStream> installSnapshotStream) {
1312 actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1313 installSnapshotStream), actorRef);
1314 } catch (RuntimeException e) {
1316 } catch (Exception e) {
1317 throw new RuntimeException(e);
1322 public void applySnapshot(final State snapshotState) {
1326 public State deserializeSnapshot(final ByteSource snapshotBytes) {
1327 throw new UnsupportedOperationException();
1330 return snapshotCohort;
1333 public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1334 int snapshotLength = bs.size();
1336 int size = chunkSize;
1337 if (chunkSize > snapshotLength) {
1338 size = snapshotLength;
1340 if (start + chunkSize > snapshotLength) {
1341 size = snapshotLength - start;
1345 byte[] nextChunk = new byte[size];
1346 bs.copyTo(nextChunk, start, 0, size);
1350 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1351 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1352 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1355 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1356 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1357 final boolean expForceInstallSnapshot) {
1359 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1360 AppendEntriesReply.class);
1362 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1363 assertEquals("getTerm", expTerm, reply.getTerm());
1364 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1365 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1366 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1367 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1368 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1372 private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1373 return new SimpleReplicatedLogEntry(index, term,
1374 new MockRaftActorContext.MockPayload(data));
1377 private ByteString createSnapshot() {
1378 HashMap<String, String> followerSnapshot = new HashMap<>();
1379 followerSnapshot.put("1", "A");
1380 followerSnapshot.put("2", "B");
1381 followerSnapshot.put("3", "C");
1383 return toByteString(followerSnapshot);
1387 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1388 final ActorRef actorRef, final RaftRPC rpc) {
1389 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1391 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1392 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1396 protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1397 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1398 assertEquals("isSuccess", true, reply.isSuccess());