2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.JavaTestKit;
25 import akka.testkit.TestActorRef;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Stopwatch;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.google.protobuf.ByteString;
33 import java.io.OutputStream;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.After;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.MockRaftActor;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
48 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
49 import org.opendaylight.controller.cluster.raft.RaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
55 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
56 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
63 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
64 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
65 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
68 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
70 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
71 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
72 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
73 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
74 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import scala.concurrent.duration.FiniteDuration;
78 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
80 private final ActorRef followerActor = actorFactory.createActor(
81 MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
83 private final ActorRef leaderActor = actorFactory.createActor(
84 MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
86 private Follower follower;
88 private final short payloadVersion = 5;
92 public void tearDown() {
93 if (follower != null) {
101 protected Follower createBehavior(final RaftActorContext actorContext) {
102 return spy(new Follower(actorContext));
106 protected MockRaftActorContext createActorContext() {
107 return createActorContext(followerActor);
111 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
112 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
113 context.setPayloadVersion(payloadVersion);
118 public void testThatAnElectionTimeoutIsTriggered() {
119 MockRaftActorContext actorContext = createActorContext();
120 follower = new Follower(actorContext);
122 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
123 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
127 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
128 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
130 MockRaftActorContext context = createActorContext();
131 follower = new Follower(context);
133 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
134 TimeUnit.MILLISECONDS);
135 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
137 assertTrue(raftBehavior instanceof Candidate);
141 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
142 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
144 MockRaftActorContext context = createActorContext();
145 ((DefaultConfigParamsImpl) context.getConfigParams())
146 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
147 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
149 follower = new Follower(context);
150 context.setCurrentBehavior(follower);
152 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
153 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
154 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
157 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
158 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
159 assertTrue(raftBehavior instanceof Follower);
161 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
162 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
163 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
166 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
167 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
168 assertTrue(raftBehavior instanceof Follower);
172 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
173 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
175 MockRaftActorContext context = createActorContext();
177 context.getTermInformation().update(term, null);
179 follower = createBehavior(context);
181 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
183 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
185 assertEquals("isVoteGranted", true, reply.isVoteGranted());
186 assertEquals("getTerm", term, reply.getTerm());
187 verify(follower).scheduleElection(any(FiniteDuration.class));
191 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
192 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
194 MockRaftActorContext context = createActorContext();
196 context.getTermInformation().update(term, "test");
198 follower = createBehavior(context);
200 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
202 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
204 assertEquals("isVoteGranted", false, reply.isVoteGranted());
205 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
210 public void testHandleFirstAppendEntries() {
211 logStart("testHandleFirstAppendEntries");
213 MockRaftActorContext context = createActorContext();
214 context.getReplicatedLog().clear(0,2);
215 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
216 context.getReplicatedLog().setSnapshotIndex(99);
218 List<ReplicatedLogEntry> entries = Arrays.asList(
219 newReplicatedLogEntry(2, 101, "foo"));
221 Assert.assertEquals(1, context.getReplicatedLog().size());
223 // The new commitIndex is 101
224 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
226 follower = createBehavior(context);
227 follower.handleMessage(leaderActor, appendEntries);
229 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
230 FollowerInitialSyncUpStatus.class);
231 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
233 assertFalse(syncStatus.isInitialSyncDone());
234 assertTrue("append entries reply should be true", reply.isSuccess());
238 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
239 logStart("testHandleFirstAppendEntries");
241 MockRaftActorContext context = createActorContext();
243 List<ReplicatedLogEntry> entries = Arrays.asList(
244 newReplicatedLogEntry(2, 101, "foo"));
246 // The new commitIndex is 101
247 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
249 follower = createBehavior(context);
250 follower.handleMessage(leaderActor, appendEntries);
252 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
253 FollowerInitialSyncUpStatus.class);
254 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
256 assertFalse(syncStatus.isInitialSyncDone());
257 assertFalse("append entries reply should be false", reply.isSuccess());
261 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
262 logStart("testHandleFirstAppendEntries");
264 MockRaftActorContext context = createActorContext();
265 context.getReplicatedLog().clear(0,2);
266 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
267 context.getReplicatedLog().setSnapshotIndex(99);
269 List<ReplicatedLogEntry> entries = Arrays.asList(
270 newReplicatedLogEntry(2, 101, "foo"));
272 // The new commitIndex is 101
273 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
275 follower = createBehavior(context);
276 follower.handleMessage(leaderActor, appendEntries);
278 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
279 FollowerInitialSyncUpStatus.class);
280 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
282 assertFalse(syncStatus.isInitialSyncDone());
283 assertTrue("append entries reply should be true", reply.isSuccess());
287 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
288 logStart("testHandleFirstAppendEntries");
290 MockRaftActorContext context = createActorContext();
291 context.getReplicatedLog().clear(0,2);
292 context.getReplicatedLog().setSnapshotIndex(100);
294 List<ReplicatedLogEntry> entries = Arrays.asList(
295 newReplicatedLogEntry(2, 101, "foo"));
297 // The new commitIndex is 101
298 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
300 follower = createBehavior(context);
301 follower.handleMessage(leaderActor, appendEntries);
303 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
304 FollowerInitialSyncUpStatus.class);
305 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
307 assertFalse(syncStatus.isInitialSyncDone());
308 assertTrue("append entries reply should be true", reply.isSuccess());
312 public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
314 "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
316 MockRaftActorContext context = createActorContext();
317 context.getReplicatedLog().clear(0,2);
318 context.getReplicatedLog().setSnapshotIndex(100);
320 List<ReplicatedLogEntry> entries = Arrays.asList(
321 newReplicatedLogEntry(2, 105, "foo"));
323 // The new commitIndex is 101
324 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
326 follower = createBehavior(context);
327 follower.handleMessage(leaderActor, appendEntries);
329 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
330 FollowerInitialSyncUpStatus.class);
331 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
333 assertFalse(syncStatus.isInitialSyncDone());
334 assertFalse("append entries reply should be false", reply.isSuccess());
338 public void testHandleSyncUpAppendEntries() {
339 logStart("testHandleSyncUpAppendEntries");
341 MockRaftActorContext context = createActorContext();
343 List<ReplicatedLogEntry> entries = Arrays.asList(
344 newReplicatedLogEntry(2, 101, "foo"));
346 // The new commitIndex is 101
347 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
349 follower = createBehavior(context);
350 follower.handleMessage(leaderActor, appendEntries);
352 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
353 FollowerInitialSyncUpStatus.class);
355 assertFalse(syncStatus.isInitialSyncDone());
357 // Clear all the messages
358 MessageCollectorActor.clearMessages(followerActor);
360 context.setLastApplied(101);
361 context.setCommitIndex(101);
362 setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
364 entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
366 // The new commitIndex is 101
367 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
368 follower.handleMessage(leaderActor, appendEntries);
370 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
372 assertTrue(syncStatus.isInitialSyncDone());
374 MessageCollectorActor.clearMessages(followerActor);
376 // Sending the same message again should not generate another message
378 follower.handleMessage(leaderActor, appendEntries);
380 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
382 assertNull(syncStatus);
386 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
387 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
389 MockRaftActorContext context = createActorContext();
391 List<ReplicatedLogEntry> entries = Arrays.asList(
392 newReplicatedLogEntry(2, 101, "foo"));
394 // The new commitIndex is 101
395 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
397 follower = createBehavior(context);
398 follower.handleMessage(leaderActor, appendEntries);
400 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
401 FollowerInitialSyncUpStatus.class);
403 assertFalse(syncStatus.isInitialSyncDone());
405 // Clear all the messages
406 MessageCollectorActor.clearMessages(followerActor);
408 context.setLastApplied(100);
409 setLastLogEntry(context, 1, 100,
410 new MockRaftActorContext.MockPayload(""));
412 entries = Arrays.asList(
413 newReplicatedLogEntry(2, 101, "foo"));
415 // leader-2 is becoming the leader now and it says the commitIndex is 45
416 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
417 follower.handleMessage(leaderActor, appendEntries);
419 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
421 // We get a new message saying initial status is not done
422 assertFalse(syncStatus.isInitialSyncDone());
426 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
427 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
429 MockRaftActorContext context = createActorContext();
431 List<ReplicatedLogEntry> entries = Arrays.asList(
432 newReplicatedLogEntry(2, 101, "foo"));
434 // The new commitIndex is 101
435 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
437 follower = createBehavior(context);
438 follower.handleMessage(leaderActor, appendEntries);
440 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
441 FollowerInitialSyncUpStatus.class);
443 assertFalse(syncStatus.isInitialSyncDone());
445 // Clear all the messages
446 MessageCollectorActor.clearMessages(followerActor);
448 context.setLastApplied(101);
449 context.setCommitIndex(101);
450 setLastLogEntry(context, 1, 101,
451 new MockRaftActorContext.MockPayload(""));
453 entries = Arrays.asList(
454 newReplicatedLogEntry(2, 101, "foo"));
456 // The new commitIndex is 101
457 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
458 follower.handleMessage(leaderActor, appendEntries);
460 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
462 assertTrue(syncStatus.isInitialSyncDone());
464 // Clear all the messages
465 MessageCollectorActor.clearMessages(followerActor);
467 context.setLastApplied(100);
468 setLastLogEntry(context, 1, 100,
469 new MockRaftActorContext.MockPayload(""));
471 entries = Arrays.asList(
472 newReplicatedLogEntry(2, 101, "foo"));
474 // leader-2 is becoming the leader now and it says the commitIndex is 45
475 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
476 follower.handleMessage(leaderActor, appendEntries);
478 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
480 // We get a new message saying initial status is not done
481 assertFalse(syncStatus.isInitialSyncDone());
485 * This test verifies that when an AppendEntries RPC is received by a RaftActor
486 * with a commitIndex that is greater than what has been applied to the
487 * state machine of the RaftActor, the RaftActor applies the state and
488 * sets it current applied state to the commitIndex of the sender.
491 public void testHandleAppendEntriesWithNewerCommitIndex() {
492 logStart("testHandleAppendEntriesWithNewerCommitIndex");
494 MockRaftActorContext context = createActorContext();
496 context.setLastApplied(100);
497 setLastLogEntry(context, 1, 100,
498 new MockRaftActorContext.MockPayload(""));
499 context.getReplicatedLog().setSnapshotIndex(99);
501 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
502 newReplicatedLogEntry(2, 101, "foo"));
504 // The new commitIndex is 101
505 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
507 follower = createBehavior(context);
508 follower.handleMessage(leaderActor, appendEntries);
510 assertEquals("getLastApplied", 101L, context.getLastApplied());
514 * This test verifies that when an AppendEntries is received a specific prevLogTerm
515 * which does not match the term that is in RaftActors log entry at prevLogIndex
516 * then the RaftActor does not change it's state and it returns a failure.
519 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
520 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
522 MockRaftActorContext context = createActorContext();
524 // First set the receivers term to lower number
525 context.getTermInformation().update(95, "test");
527 // AppendEntries is now sent with a bigger term
528 // this will set the receivers term to be the same as the sender's term
529 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
532 follower = createBehavior(context);
534 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
536 Assert.assertSame(follower, newBehavior);
538 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
539 AppendEntriesReply.class);
541 assertEquals("isSuccess", false, reply.isSuccess());
545 * This test verifies that when a new AppendEntries message is received with
546 * new entries and the logs of the sender and receiver match that the new
547 * entries get added to the log and the log is incremented by the number of
548 * entries received in appendEntries.
551 public void testHandleAppendEntriesAddNewEntries() {
552 logStart("testHandleAppendEntriesAddNewEntries");
554 MockRaftActorContext context = createActorContext();
556 // First set the receivers term to lower number
557 context.getTermInformation().update(1, "test");
559 // Prepare the receivers log
560 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
561 log.append(newReplicatedLogEntry(1, 0, "zero"));
562 log.append(newReplicatedLogEntry(1, 1, "one"));
563 log.append(newReplicatedLogEntry(1, 2, "two"));
565 context.setReplicatedLog(log);
567 // Prepare the entries to be sent with AppendEntries
568 List<ReplicatedLogEntry> entries = new ArrayList<>();
569 entries.add(newReplicatedLogEntry(1, 3, "three"));
570 entries.add(newReplicatedLogEntry(1, 4, "four"));
572 // Send appendEntries with the same term as was set on the receiver
573 // before the new behavior was created (1 in this case)
574 // This will not work for a Candidate because as soon as a Candidate
575 // is created it increments the term
576 short leaderPayloadVersion = 10;
577 String leaderId = "leader-1";
578 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
580 follower = createBehavior(context);
582 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
584 Assert.assertSame(follower, newBehavior);
586 assertEquals("Next index", 5, log.last().getIndex() + 1);
587 assertEquals("Entry 3", entries.get(0), log.get(3));
588 assertEquals("Entry 4", entries.get(1), log.get(4));
590 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
591 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
593 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
597 * This test verifies that when a new AppendEntries message is received with
598 * new entries and the logs of the sender and receiver are out-of-sync that
599 * the log is first corrected by removing the out of sync entries from the
600 * log and then adding in the new entries sent with the AppendEntries message.
603 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
604 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
606 MockRaftActorContext context = createActorContext();
608 // First set the receivers term to lower number
609 context.getTermInformation().update(1, "test");
611 // Prepare the receivers log
612 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
613 log.append(newReplicatedLogEntry(1, 0, "zero"));
614 log.append(newReplicatedLogEntry(1, 1, "one"));
615 log.append(newReplicatedLogEntry(1, 2, "two"));
617 context.setReplicatedLog(log);
619 // Prepare the entries to be sent with AppendEntries
620 List<ReplicatedLogEntry> entries = new ArrayList<>();
621 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
622 entries.add(newReplicatedLogEntry(2, 3, "three"));
624 // Send appendEntries with the same term as was set on the receiver
625 // before the new behavior was created (1 in this case)
626 // This will not work for a Candidate because as soon as a Candidate
627 // is created it increments the term
628 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
630 follower = createBehavior(context);
632 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
634 Assert.assertSame(follower, newBehavior);
636 // The entry at index 2 will be found out-of-sync with the leader
637 // and will be removed
638 // Then the two new entries will be added to the log
639 // Thus making the log to have 4 entries
640 assertEquals("Next index", 4, log.last().getIndex() + 1);
641 //assertEquals("Entry 2", entries.get(0), log.get(2));
643 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
645 // Check that the entry at index 2 has the new data
646 assertEquals("Entry 2", entries.get(0), log.get(2));
648 assertEquals("Entry 3", entries.get(1), log.get(3));
650 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
654 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
655 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
657 MockRaftActorContext context = createActorContext();
659 // First set the receivers term to lower number
660 context.getTermInformation().update(1, "test");
662 // Prepare the receivers log
663 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
664 log.append(newReplicatedLogEntry(1, 0, "zero"));
665 log.append(newReplicatedLogEntry(1, 1, "one"));
666 log.append(newReplicatedLogEntry(1, 2, "two"));
668 context.setReplicatedLog(log);
670 // Prepare the entries to be sent with AppendEntries
671 List<ReplicatedLogEntry> entries = new ArrayList<>();
672 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
673 entries.add(newReplicatedLogEntry(2, 3, "three"));
675 // Send appendEntries with the same term as was set on the receiver
676 // before the new behavior was created (1 in this case)
677 // This will not work for a Candidate because as soon as a Candidate
678 // is created it increments the term
679 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
681 context.setRaftPolicy(createRaftPolicy(false, true));
682 follower = createBehavior(context);
684 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
686 Assert.assertSame(follower, newBehavior);
688 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
692 public void testHandleAppendEntriesPreviousLogEntryMissing() {
693 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
695 final MockRaftActorContext context = createActorContext();
697 // Prepare the receivers log
698 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
699 log.append(newReplicatedLogEntry(1, 0, "zero"));
700 log.append(newReplicatedLogEntry(1, 1, "one"));
701 log.append(newReplicatedLogEntry(1, 2, "two"));
703 context.setReplicatedLog(log);
705 // Prepare the entries to be sent with AppendEntries
706 List<ReplicatedLogEntry> entries = new ArrayList<>();
707 entries.add(newReplicatedLogEntry(1, 4, "four"));
709 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
711 follower = createBehavior(context);
713 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
715 Assert.assertSame(follower, newBehavior);
717 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
721 public void testHandleAppendEntriesWithExistingLogEntry() {
722 logStart("testHandleAppendEntriesWithExistingLogEntry");
724 MockRaftActorContext context = createActorContext();
726 context.getTermInformation().update(1, "test");
728 // Prepare the receivers log
729 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
730 log.append(newReplicatedLogEntry(1, 0, "zero"));
731 log.append(newReplicatedLogEntry(1, 1, "one"));
733 context.setReplicatedLog(log);
735 // Send the last entry again.
736 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
738 follower = createBehavior(context);
740 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
742 assertEquals("Next index", 2, log.last().getIndex() + 1);
743 assertEquals("Entry 1", entries.get(0), log.get(1));
745 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
747 // Send the last entry again and also a new one.
749 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
751 MessageCollectorActor.clearMessages(leaderActor);
752 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
754 assertEquals("Next index", 3, log.last().getIndex() + 1);
755 assertEquals("Entry 1", entries.get(0), log.get(1));
756 assertEquals("Entry 2", entries.get(1), log.get(2));
758 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
762 public void testHandleAppendEntriesAfterInstallingSnapshot() {
763 logStart("testHandleAppendAfterInstallingSnapshot");
765 MockRaftActorContext context = createActorContext();
767 // Prepare the receivers log
768 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
770 // Set up a log as if it has been snapshotted
771 log.setSnapshotIndex(3);
772 log.setSnapshotTerm(1);
774 context.setReplicatedLog(log);
776 // Prepare the entries to be sent with AppendEntries
777 List<ReplicatedLogEntry> entries = new ArrayList<>();
778 entries.add(newReplicatedLogEntry(1, 4, "four"));
780 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
782 follower = createBehavior(context);
784 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
786 Assert.assertSame(follower, newBehavior);
788 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
793 * This test verifies that when InstallSnapshot is received by
794 * the follower its applied correctly.
797 public void testHandleInstallSnapshot() {
798 logStart("testHandleInstallSnapshot");
800 MockRaftActorContext context = createActorContext();
801 context.getTermInformation().update(1, "leader");
803 follower = createBehavior(context);
805 ByteString bsSnapshot = createSnapshot();
807 int snapshotLength = bsSnapshot.size();
809 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
810 int lastIncludedIndex = 1;
812 InstallSnapshot lastInstallSnapshot = null;
814 for (int i = 0; i < totalChunks; i++) {
815 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
816 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
817 chunkData, chunkIndex, totalChunks);
818 follower.handleMessage(leaderActor, lastInstallSnapshot);
819 offset = offset + 50;
824 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
825 ApplySnapshot.class);
826 Snapshot snapshot = applySnapshot.getSnapshot();
827 assertNotNull(lastInstallSnapshot);
828 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
829 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
830 snapshot.getLastAppliedTerm());
831 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
832 snapshot.getLastAppliedIndex());
833 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
834 assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
835 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
836 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
837 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
838 applySnapshot.getCallback().onSuccess();
840 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
841 leaderActor, InstallSnapshotReply.class);
842 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
845 for (InstallSnapshotReply reply: replies) {
846 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
847 assertEquals("getTerm", 1, reply.getTerm());
848 assertEquals("isSuccess", true, reply.isSuccess());
849 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
852 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
856 * Verify that when an AppendEntries is sent to a follower during a snapshot install
857 * the Follower short-circuits the processing of the AppendEntries message.
860 public void testReceivingAppendEntriesDuringInstallSnapshot() {
861 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
863 MockRaftActorContext context = createActorContext();
865 follower = createBehavior(context);
867 ByteString bsSnapshot = createSnapshot();
868 int snapshotLength = bsSnapshot.size();
870 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
871 int lastIncludedIndex = 1;
873 // Check that snapshot installation is not in progress
874 assertNull(follower.getSnapshotTracker());
876 // Make sure that we have more than 1 chunk to send
877 assertTrue(totalChunks > 1);
879 // Send an install snapshot with the first chunk to start the process of installing a snapshot
880 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
881 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
882 chunkData, 1, totalChunks));
884 // Check if snapshot installation is in progress now
885 assertNotNull(follower.getSnapshotTracker());
887 // Send an append entry
888 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
889 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
891 follower.handleMessage(leaderActor, appendEntries);
893 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
894 assertEquals("isSuccess", true, reply.isSuccess());
895 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
896 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
897 assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
899 assertNotNull(follower.getSnapshotTracker());
903 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
904 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
906 MockRaftActorContext context = createActorContext();
908 follower = createBehavior(context);
910 ByteString bsSnapshot = createSnapshot();
911 int snapshotLength = bsSnapshot.size();
913 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
914 int lastIncludedIndex = 1;
916 // Check that snapshot installation is not in progress
917 assertNull(follower.getSnapshotTracker());
919 // Make sure that we have more than 1 chunk to send
920 assertTrue(totalChunks > 1);
922 // Send an install snapshot with the first chunk to start the process of installing a snapshot
923 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
924 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
925 chunkData, 1, totalChunks));
927 // Check if snapshot installation is in progress now
928 assertNotNull(follower.getSnapshotTracker());
930 // Send appendEntries with a new term and leader.
931 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
932 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
934 follower.handleMessage(leaderActor, appendEntries);
936 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
937 assertEquals("isSuccess", true, reply.isSuccess());
938 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
939 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
940 assertEquals("getTerm", 2, reply.getTerm());
942 assertNull(follower.getSnapshotTracker());
946 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
947 logStart("testInitialSyncUpWithHandleInstallSnapshot");
949 MockRaftActorContext context = createActorContext();
950 context.setCommitIndex(-1);
952 follower = createBehavior(context);
954 ByteString bsSnapshot = createSnapshot();
956 int snapshotLength = bsSnapshot.size();
958 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
959 int lastIncludedIndex = 1;
961 InstallSnapshot lastInstallSnapshot = null;
963 for (int i = 0; i < totalChunks; i++) {
964 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
965 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
966 chunkData, chunkIndex, totalChunks);
967 follower.handleMessage(leaderActor, lastInstallSnapshot);
968 offset = offset + 50;
973 FollowerInitialSyncUpStatus syncStatus =
974 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
976 assertFalse(syncStatus.isInitialSyncDone());
978 // Clear all the messages
979 MessageCollectorActor.clearMessages(followerActor);
981 context.setLastApplied(101);
982 context.setCommitIndex(101);
983 setLastLogEntry(context, 1, 101,
984 new MockRaftActorContext.MockPayload(""));
986 List<ReplicatedLogEntry> entries = Arrays.asList(
987 newReplicatedLogEntry(2, 101, "foo"));
989 // The new commitIndex is 101
990 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
991 follower.handleMessage(leaderActor, appendEntries);
993 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
995 assertTrue(syncStatus.isInitialSyncDone());
999 public void testHandleOutOfSequenceInstallSnapshot() {
1000 logStart("testHandleOutOfSequenceInstallSnapshot");
1002 MockRaftActorContext context = createActorContext();
1004 follower = createBehavior(context);
1006 ByteString bsSnapshot = createSnapshot();
1008 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1009 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1010 follower.handleMessage(leaderActor, installSnapshot);
1012 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1013 InstallSnapshotReply.class);
1015 assertEquals("isSuccess", false, reply.isSuccess());
1016 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1017 assertEquals("getTerm", 1, reply.getTerm());
1018 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1020 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1024 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1025 MockRaftActorContext context = createActorContext();
1027 Stopwatch stopwatch = Stopwatch.createStarted();
1029 follower = createBehavior(context);
1031 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1033 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1035 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1037 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1038 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1042 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1043 MockRaftActorContext context = createActorContext();
1044 context.setConfigParams(new DefaultConfigParamsImpl() {
1046 public FiniteDuration getElectionTimeOutInterval() {
1047 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1051 context.setRaftPolicy(createRaftPolicy(false, false));
1053 follower = createBehavior(context);
1055 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1056 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1057 assertSame("handleMessage result", follower, newBehavior);
1061 public void testFollowerSchedulesElectionIfNonVoting() {
1062 MockRaftActorContext context = createActorContext();
1063 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1064 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1065 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1066 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1068 follower = new Follower(context, "leader", (short)1);
1070 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1071 ElectionTimeout.class);
1072 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1073 assertSame("handleMessage result", follower, newBehavior);
1074 assertNull("Expected null leaderId", follower.getLeaderId());
1078 public void testElectionScheduledWhenAnyRaftRPCReceived() {
1079 MockRaftActorContext context = createActorContext();
1080 follower = createBehavior(context);
1081 follower.handleMessage(leaderActor, new RaftRPC() {
1082 private static final long serialVersionUID = 1L;
1085 public long getTerm() {
1089 verify(follower).scheduleElection(any(FiniteDuration.class));
1093 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1094 MockRaftActorContext context = createActorContext();
1095 follower = createBehavior(context);
1096 follower.handleMessage(leaderActor, "non-raft-rpc");
1097 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1101 public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1102 String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1105 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1107 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1108 config.setSnapshotBatchCount(2);
1109 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1111 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1112 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1113 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1114 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1115 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1116 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1117 followerRaftActor.set(followerActorRef.underlyingActor());
1118 followerRaftActor.get().waitForInitializeBehaviorComplete();
1120 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1121 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1122 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1124 List<ReplicatedLogEntry> entries = Arrays.asList(
1125 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1127 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1129 followerActorRef.tell(appendEntries, leaderActor);
1131 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1132 assertEquals("isSuccess", true, reply.isSuccess());
1134 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1136 InMemoryJournal.waitForDeleteMessagesComplete(id);
1137 InMemoryJournal.waitForWriteMessagesComplete(id);
1138 // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1139 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1140 // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1141 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1142 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1143 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1144 assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1146 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1147 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1148 assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1149 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1150 assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1151 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1152 MockRaftActor.fromState(snapshot.getState()));
1156 public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1157 String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1160 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1162 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1163 config.setSnapshotBatchCount(2);
1164 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1166 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1167 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1168 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1169 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1170 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1171 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1172 followerRaftActor.set(followerActorRef.underlyingActor());
1173 followerRaftActor.get().waitForInitializeBehaviorComplete();
1175 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1176 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1177 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1179 List<ReplicatedLogEntry> entries = Arrays.asList(
1180 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1181 newReplicatedLogEntry(1, 2, "three"));
1183 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1185 followerActorRef.tell(appendEntries, leaderActor);
1187 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1188 assertEquals("isSuccess", true, reply.isSuccess());
1190 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1192 InMemoryJournal.waitForDeleteMessagesComplete(id);
1193 InMemoryJournal.waitForWriteMessagesComplete(id);
1194 // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1195 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1196 // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1197 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1198 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1199 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1200 assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1202 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1203 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1204 assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1205 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1206 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1207 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1208 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1210 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1211 assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1213 // Reinstate the actor from persistence
1215 actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
1217 followerActorRef = actorFactory.createTestActor(builder.props()
1218 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1219 followerRaftActor.set(followerActorRef.underlyingActor());
1220 followerRaftActor.get().waitForInitializeBehaviorComplete();
1222 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1223 assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1224 assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1225 assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1226 assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1227 entries.get(2).getData()), followerRaftActor.get().getState());
1231 public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1232 String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1235 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1237 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1238 config.setSnapshotBatchCount(1);
1239 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1241 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1242 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1243 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1244 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1245 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1246 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1247 followerRaftActor.set(followerActorRef.underlyingActor());
1248 followerRaftActor.get().waitForInitializeBehaviorComplete();
1250 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1251 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1252 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1254 List<ReplicatedLogEntry> entries = Arrays.asList(
1255 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1256 newReplicatedLogEntry(1, 2, "three"));
1258 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1260 followerActorRef.tell(appendEntries, leaderActor);
1262 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1263 assertEquals("isSuccess", true, reply.isSuccess());
1265 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1267 InMemoryJournal.waitForDeleteMessagesComplete(id);
1268 InMemoryJournal.waitForWriteMessagesComplete(id);
1269 // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1270 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1271 // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1272 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1273 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1274 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1275 assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1277 assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1278 assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1279 assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1280 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1281 assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1282 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1283 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1284 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1285 MockRaftActor.fromState(snapshot.getState()));
1288 @SuppressWarnings("checkstyle:IllegalCatch")
1289 private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1290 final AtomicReference<MockRaftActor> followerRaftActor) {
1291 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1293 public void createSnapshot(final ActorRef actorRef,
1294 final java.util.Optional<OutputStream> installSnapshotStream) {
1296 actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1297 installSnapshotStream), actorRef);
1298 } catch (RuntimeException e) {
1300 } catch (Exception e) {
1301 throw new RuntimeException(e);
1306 public void applySnapshot(final State snapshotState) {
1310 public State deserializeSnapshot(final ByteSource snapshotBytes) {
1311 throw new UnsupportedOperationException();
1314 return snapshotCohort;
1317 public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1318 int snapshotLength = bs.size();
1320 int size = chunkSize;
1321 if (chunkSize > snapshotLength) {
1322 size = snapshotLength;
1324 if (start + chunkSize > snapshotLength) {
1325 size = snapshotLength - start;
1329 byte[] nextChunk = new byte[size];
1330 bs.copyTo(nextChunk, start, 0, size);
1334 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1335 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1336 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1339 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1340 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1341 final boolean expForceInstallSnapshot) {
1343 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1344 AppendEntriesReply.class);
1346 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1347 assertEquals("getTerm", expTerm, reply.getTerm());
1348 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1349 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1350 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1351 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1352 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1356 private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1357 return new SimpleReplicatedLogEntry(index, term,
1358 new MockRaftActorContext.MockPayload(data));
1361 private ByteString createSnapshot() {
1362 HashMap<String, String> followerSnapshot = new HashMap<>();
1363 followerSnapshot.put("1", "A");
1364 followerSnapshot.put("2", "B");
1365 followerSnapshot.put("3", "C");
1367 return toByteString(followerSnapshot);
1371 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1372 final ActorRef actorRef, final RaftRPC rpc) {
1373 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1375 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1376 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1380 protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1381 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1382 assertEquals("isSuccess", true, reply.isSuccess());