2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.Props;
24 import akka.dispatch.Dispatchers;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Stopwatch;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.io.ByteSource;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import com.google.protobuf.ByteString;
34 import java.io.OutputStream;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Assert;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
48 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
55 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
56 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
57 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
59 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
61 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
63 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
65 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
66 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
71 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
72 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
73 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
74 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
75 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import scala.concurrent.duration.FiniteDuration;
79 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
81 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
82 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
84 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
85 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
87 private Follower follower;
89 private final short payloadVersion = 5;
93 public void tearDown() {
94 if (follower != null) {
102 protected Follower createBehavior(final RaftActorContext actorContext) {
103 return spy(new Follower(actorContext));
107 protected MockRaftActorContext createActorContext() {
108 return createActorContext(followerActor);
112 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
113 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
114 context.setPayloadVersion(payloadVersion);
119 public void testThatAnElectionTimeoutIsTriggered() {
120 MockRaftActorContext actorContext = createActorContext();
121 follower = new Follower(actorContext);
123 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
124 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
128 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
129 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
131 MockRaftActorContext context = createActorContext();
132 follower = new Follower(context);
134 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
135 TimeUnit.MILLISECONDS);
136 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
138 assertTrue(raftBehavior instanceof Candidate);
142 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
143 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
145 MockRaftActorContext context = createActorContext();
146 ((DefaultConfigParamsImpl) context.getConfigParams())
147 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
148 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
150 follower = new Follower(context);
151 context.setCurrentBehavior(follower);
153 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
154 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
155 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
158 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
159 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
160 assertTrue(raftBehavior instanceof Follower);
162 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
163 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
164 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
167 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
168 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
169 assertTrue(raftBehavior instanceof Follower);
173 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
174 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
176 MockRaftActorContext context = createActorContext();
178 context.getTermInformation().update(term, null);
180 follower = createBehavior(context);
182 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
184 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
186 assertEquals("isVoteGranted", true, reply.isVoteGranted());
187 assertEquals("getTerm", term, reply.getTerm());
188 verify(follower).scheduleElection(any(FiniteDuration.class));
192 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
193 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
195 MockRaftActorContext context = createActorContext();
197 context.getTermInformation().update(term, "test");
199 follower = createBehavior(context);
201 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
203 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
205 assertEquals("isVoteGranted", false, reply.isVoteGranted());
206 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
211 public void testHandleFirstAppendEntries() {
212 logStart("testHandleFirstAppendEntries");
214 MockRaftActorContext context = createActorContext();
215 context.getReplicatedLog().clear(0,2);
216 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
217 context.getReplicatedLog().setSnapshotIndex(99);
219 List<ReplicatedLogEntry> entries = Arrays.asList(
220 newReplicatedLogEntry(2, 101, "foo"));
222 Assert.assertEquals(1, context.getReplicatedLog().size());
224 // The new commitIndex is 101
225 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
227 follower = createBehavior(context);
228 follower.handleMessage(leaderActor, appendEntries);
230 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
231 FollowerInitialSyncUpStatus.class);
232 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
234 assertFalse(syncStatus.isInitialSyncDone());
235 assertTrue("append entries reply should be true", reply.isSuccess());
239 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
240 logStart("testHandleFirstAppendEntries");
242 MockRaftActorContext context = createActorContext();
244 List<ReplicatedLogEntry> entries = Arrays.asList(
245 newReplicatedLogEntry(2, 101, "foo"));
247 // The new commitIndex is 101
248 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
250 follower = createBehavior(context);
251 follower.handleMessage(leaderActor, appendEntries);
253 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
254 FollowerInitialSyncUpStatus.class);
255 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
257 assertFalse(syncStatus.isInitialSyncDone());
258 assertFalse("append entries reply should be false", reply.isSuccess());
262 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
263 logStart("testHandleFirstAppendEntries");
265 MockRaftActorContext context = createActorContext();
266 context.getReplicatedLog().clear(0,2);
267 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
268 context.getReplicatedLog().setSnapshotIndex(99);
270 List<ReplicatedLogEntry> entries = Arrays.asList(
271 newReplicatedLogEntry(2, 101, "foo"));
273 // The new commitIndex is 101
274 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
276 follower = createBehavior(context);
277 follower.handleMessage(leaderActor, appendEntries);
279 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
280 FollowerInitialSyncUpStatus.class);
281 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
283 assertFalse(syncStatus.isInitialSyncDone());
284 assertTrue("append entries reply should be true", reply.isSuccess());
288 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
289 logStart("testHandleFirstAppendEntries");
291 MockRaftActorContext context = createActorContext();
292 context.getReplicatedLog().clear(0,2);
293 context.getReplicatedLog().setSnapshotIndex(100);
295 List<ReplicatedLogEntry> entries = Arrays.asList(
296 newReplicatedLogEntry(2, 101, "foo"));
298 // The new commitIndex is 101
299 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
301 follower = createBehavior(context);
302 follower.handleMessage(leaderActor, appendEntries);
304 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
305 FollowerInitialSyncUpStatus.class);
306 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
308 assertFalse(syncStatus.isInitialSyncDone());
309 assertTrue("append entries reply should be true", reply.isSuccess());
313 public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
315 "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
317 MockRaftActorContext context = createActorContext();
318 context.getReplicatedLog().clear(0,2);
319 context.getReplicatedLog().setSnapshotIndex(100);
321 List<ReplicatedLogEntry> entries = Arrays.asList(
322 newReplicatedLogEntry(2, 105, "foo"));
324 // The new commitIndex is 101
325 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
327 follower = createBehavior(context);
328 follower.handleMessage(leaderActor, appendEntries);
330 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
331 FollowerInitialSyncUpStatus.class);
332 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
334 assertFalse(syncStatus.isInitialSyncDone());
335 assertFalse("append entries reply should be false", reply.isSuccess());
339 public void testHandleSyncUpAppendEntries() {
340 logStart("testHandleSyncUpAppendEntries");
342 MockRaftActorContext context = createActorContext();
344 List<ReplicatedLogEntry> entries = Arrays.asList(
345 newReplicatedLogEntry(2, 101, "foo"));
347 // The new commitIndex is 101
348 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
350 follower = createBehavior(context);
351 follower.handleMessage(leaderActor, appendEntries);
353 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
354 FollowerInitialSyncUpStatus.class);
356 assertFalse(syncStatus.isInitialSyncDone());
358 // Clear all the messages
359 followerActor.underlyingActor().clear();
361 context.setLastApplied(101);
362 context.setCommitIndex(101);
363 setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
365 entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
367 // The new commitIndex is 101
368 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
369 follower.handleMessage(leaderActor, appendEntries);
371 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
373 assertTrue(syncStatus.isInitialSyncDone());
375 followerActor.underlyingActor().clear();
377 // Sending the same message again should not generate another message
379 follower.handleMessage(leaderActor, appendEntries);
381 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
383 assertNull(syncStatus);
387 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
388 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
390 MockRaftActorContext context = createActorContext();
392 List<ReplicatedLogEntry> entries = Arrays.asList(
393 newReplicatedLogEntry(2, 101, "foo"));
395 // The new commitIndex is 101
396 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
398 follower = createBehavior(context);
399 follower.handleMessage(leaderActor, appendEntries);
401 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
402 FollowerInitialSyncUpStatus.class);
404 assertFalse(syncStatus.isInitialSyncDone());
406 // Clear all the messages
407 followerActor.underlyingActor().clear();
409 context.setLastApplied(100);
410 setLastLogEntry(context, 1, 100,
411 new MockRaftActorContext.MockPayload(""));
413 entries = Arrays.asList(
414 newReplicatedLogEntry(2, 101, "foo"));
416 // leader-2 is becoming the leader now and it says the commitIndex is 45
417 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
418 follower.handleMessage(leaderActor, appendEntries);
420 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
422 // We get a new message saying initial status is not done
423 assertFalse(syncStatus.isInitialSyncDone());
427 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
428 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
430 MockRaftActorContext context = createActorContext();
432 List<ReplicatedLogEntry> entries = Arrays.asList(
433 newReplicatedLogEntry(2, 101, "foo"));
435 // The new commitIndex is 101
436 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
438 follower = createBehavior(context);
439 follower.handleMessage(leaderActor, appendEntries);
441 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
442 FollowerInitialSyncUpStatus.class);
444 assertFalse(syncStatus.isInitialSyncDone());
446 // Clear all the messages
447 followerActor.underlyingActor().clear();
449 context.setLastApplied(101);
450 context.setCommitIndex(101);
451 setLastLogEntry(context, 1, 101,
452 new MockRaftActorContext.MockPayload(""));
454 entries = Arrays.asList(
455 newReplicatedLogEntry(2, 101, "foo"));
457 // The new commitIndex is 101
458 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
459 follower.handleMessage(leaderActor, appendEntries);
461 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
463 assertTrue(syncStatus.isInitialSyncDone());
465 // Clear all the messages
466 followerActor.underlyingActor().clear();
468 context.setLastApplied(100);
469 setLastLogEntry(context, 1, 100,
470 new MockRaftActorContext.MockPayload(""));
472 entries = Arrays.asList(
473 newReplicatedLogEntry(2, 101, "foo"));
475 // leader-2 is becoming the leader now and it says the commitIndex is 45
476 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
477 follower.handleMessage(leaderActor, appendEntries);
479 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
481 // We get a new message saying initial status is not done
482 assertFalse(syncStatus.isInitialSyncDone());
486 * This test verifies that when an AppendEntries RPC is received by a RaftActor
487 * with a commitIndex that is greater than what has been applied to the
488 * state machine of the RaftActor, the RaftActor applies the state and
489 * sets it current applied state to the commitIndex of the sender.
492 public void testHandleAppendEntriesWithNewerCommitIndex() {
493 logStart("testHandleAppendEntriesWithNewerCommitIndex");
495 MockRaftActorContext context = createActorContext();
497 context.setLastApplied(100);
498 setLastLogEntry(context, 1, 100,
499 new MockRaftActorContext.MockPayload(""));
500 context.getReplicatedLog().setSnapshotIndex(99);
502 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
503 newReplicatedLogEntry(2, 101, "foo"));
505 // The new commitIndex is 101
506 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
508 follower = createBehavior(context);
509 follower.handleMessage(leaderActor, appendEntries);
511 assertEquals("getLastApplied", 101L, context.getLastApplied());
515 * This test verifies that when an AppendEntries is received a specific prevLogTerm
516 * which does not match the term that is in RaftActors log entry at prevLogIndex
517 * then the RaftActor does not change it's state and it returns a failure.
520 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
521 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
523 MockRaftActorContext context = createActorContext();
525 // First set the receivers term to lower number
526 context.getTermInformation().update(95, "test");
528 // AppendEntries is now sent with a bigger term
529 // this will set the receivers term to be the same as the sender's term
530 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
533 follower = createBehavior(context);
535 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
537 Assert.assertSame(follower, newBehavior);
539 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
540 AppendEntriesReply.class);
542 assertEquals("isSuccess", false, reply.isSuccess());
546 * This test verifies that when a new AppendEntries message is received with
547 * new entries and the logs of the sender and receiver match that the new
548 * entries get added to the log and the log is incremented by the number of
549 * entries received in appendEntries.
552 public void testHandleAppendEntriesAddNewEntries() {
553 logStart("testHandleAppendEntriesAddNewEntries");
555 MockRaftActorContext context = createActorContext();
557 // First set the receivers term to lower number
558 context.getTermInformation().update(1, "test");
560 // Prepare the receivers log
561 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
562 log.append(newReplicatedLogEntry(1, 0, "zero"));
563 log.append(newReplicatedLogEntry(1, 1, "one"));
564 log.append(newReplicatedLogEntry(1, 2, "two"));
566 context.setReplicatedLog(log);
568 // Prepare the entries to be sent with AppendEntries
569 List<ReplicatedLogEntry> entries = new ArrayList<>();
570 entries.add(newReplicatedLogEntry(1, 3, "three"));
571 entries.add(newReplicatedLogEntry(1, 4, "four"));
573 // Send appendEntries with the same term as was set on the receiver
574 // before the new behavior was created (1 in this case)
575 // This will not work for a Candidate because as soon as a Candidate
576 // is created it increments the term
577 short leaderPayloadVersion = 10;
578 String leaderId = "leader-1";
579 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
581 follower = createBehavior(context);
583 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
585 Assert.assertSame(follower, newBehavior);
587 assertEquals("Next index", 5, log.last().getIndex() + 1);
588 assertEquals("Entry 3", entries.get(0), log.get(3));
589 assertEquals("Entry 4", entries.get(1), log.get(4));
591 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
592 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
594 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
598 * This test verifies that when a new AppendEntries message is received with
599 * new entries and the logs of the sender and receiver are out-of-sync that
600 * the log is first corrected by removing the out of sync entries from the
601 * log and then adding in the new entries sent with the AppendEntries message.
604 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
605 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
607 MockRaftActorContext context = createActorContext();
609 // First set the receivers term to lower number
610 context.getTermInformation().update(1, "test");
612 // Prepare the receivers log
613 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
614 log.append(newReplicatedLogEntry(1, 0, "zero"));
615 log.append(newReplicatedLogEntry(1, 1, "one"));
616 log.append(newReplicatedLogEntry(1, 2, "two"));
618 context.setReplicatedLog(log);
620 // Prepare the entries to be sent with AppendEntries
621 List<ReplicatedLogEntry> entries = new ArrayList<>();
622 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
623 entries.add(newReplicatedLogEntry(2, 3, "three"));
625 // Send appendEntries with the same term as was set on the receiver
626 // before the new behavior was created (1 in this case)
627 // This will not work for a Candidate because as soon as a Candidate
628 // is created it increments the term
629 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
631 follower = createBehavior(context);
633 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
635 Assert.assertSame(follower, newBehavior);
637 // The entry at index 2 will be found out-of-sync with the leader
638 // and will be removed
639 // Then the two new entries will be added to the log
640 // Thus making the log to have 4 entries
641 assertEquals("Next index", 4, log.last().getIndex() + 1);
642 //assertEquals("Entry 2", entries.get(0), log.get(2));
644 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
646 // Check that the entry at index 2 has the new data
647 assertEquals("Entry 2", entries.get(0), log.get(2));
649 assertEquals("Entry 3", entries.get(1), log.get(3));
651 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
655 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
656 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
658 MockRaftActorContext context = createActorContext();
660 // First set the receivers term to lower number
661 context.getTermInformation().update(1, "test");
663 // Prepare the receivers log
664 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
665 log.append(newReplicatedLogEntry(1, 0, "zero"));
666 log.append(newReplicatedLogEntry(1, 1, "one"));
667 log.append(newReplicatedLogEntry(1, 2, "two"));
669 context.setReplicatedLog(log);
671 // Prepare the entries to be sent with AppendEntries
672 List<ReplicatedLogEntry> entries = new ArrayList<>();
673 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
674 entries.add(newReplicatedLogEntry(2, 3, "three"));
676 // Send appendEntries with the same term as was set on the receiver
677 // before the new behavior was created (1 in this case)
678 // This will not work for a Candidate because as soon as a Candidate
679 // is created it increments the term
680 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
682 context.setRaftPolicy(createRaftPolicy(false, true));
683 follower = createBehavior(context);
685 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
687 Assert.assertSame(follower, newBehavior);
689 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
693 public void testHandleAppendEntriesPreviousLogEntryMissing() {
694 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
696 final MockRaftActorContext context = createActorContext();
698 // Prepare the receivers log
699 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
700 log.append(newReplicatedLogEntry(1, 0, "zero"));
701 log.append(newReplicatedLogEntry(1, 1, "one"));
702 log.append(newReplicatedLogEntry(1, 2, "two"));
704 context.setReplicatedLog(log);
706 // Prepare the entries to be sent with AppendEntries
707 List<ReplicatedLogEntry> entries = new ArrayList<>();
708 entries.add(newReplicatedLogEntry(1, 4, "four"));
710 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
712 follower = createBehavior(context);
714 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
716 Assert.assertSame(follower, newBehavior);
718 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
722 public void testHandleAppendEntriesWithExistingLogEntry() {
723 logStart("testHandleAppendEntriesWithExistingLogEntry");
725 MockRaftActorContext context = createActorContext();
727 context.getTermInformation().update(1, "test");
729 // Prepare the receivers log
730 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
731 log.append(newReplicatedLogEntry(1, 0, "zero"));
732 log.append(newReplicatedLogEntry(1, 1, "one"));
734 context.setReplicatedLog(log);
736 // Send the last entry again.
737 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
739 follower = createBehavior(context);
741 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
743 assertEquals("Next index", 2, log.last().getIndex() + 1);
744 assertEquals("Entry 1", entries.get(0), log.get(1));
746 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
748 // Send the last entry again and also a new one.
750 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
752 leaderActor.underlyingActor().clear();
753 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
755 assertEquals("Next index", 3, log.last().getIndex() + 1);
756 assertEquals("Entry 1", entries.get(0), log.get(1));
757 assertEquals("Entry 2", entries.get(1), log.get(2));
759 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
763 public void testHandleAppendEntriesAfterInstallingSnapshot() {
764 logStart("testHandleAppendAfterInstallingSnapshot");
766 MockRaftActorContext context = createActorContext();
768 // Prepare the receivers log
769 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
771 // Set up a log as if it has been snapshotted
772 log.setSnapshotIndex(3);
773 log.setSnapshotTerm(1);
775 context.setReplicatedLog(log);
777 // Prepare the entries to be sent with AppendEntries
778 List<ReplicatedLogEntry> entries = new ArrayList<>();
779 entries.add(newReplicatedLogEntry(1, 4, "four"));
781 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
783 follower = createBehavior(context);
785 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
787 Assert.assertSame(follower, newBehavior);
789 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
794 * This test verifies that when InstallSnapshot is received by
795 * the follower its applied correctly.
798 public void testHandleInstallSnapshot() {
799 logStart("testHandleInstallSnapshot");
801 MockRaftActorContext context = createActorContext();
802 context.getTermInformation().update(1, "leader");
804 follower = createBehavior(context);
806 ByteString bsSnapshot = createSnapshot();
808 int snapshotLength = bsSnapshot.size();
810 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
811 int lastIncludedIndex = 1;
813 InstallSnapshot lastInstallSnapshot = null;
815 for (int i = 0; i < totalChunks; i++) {
816 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
817 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
818 chunkData, chunkIndex, totalChunks);
819 follower.handleMessage(leaderActor, lastInstallSnapshot);
820 offset = offset + 50;
825 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
826 ApplySnapshot.class);
827 Snapshot snapshot = applySnapshot.getSnapshot();
828 assertNotNull(lastInstallSnapshot);
829 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
830 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
831 snapshot.getLastAppliedTerm());
832 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
833 snapshot.getLastAppliedIndex());
834 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
835 assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
836 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
837 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
838 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
839 applySnapshot.getCallback().onSuccess();
841 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
842 leaderActor, InstallSnapshotReply.class);
843 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
846 for (InstallSnapshotReply reply: replies) {
847 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
848 assertEquals("getTerm", 1, reply.getTerm());
849 assertEquals("isSuccess", true, reply.isSuccess());
850 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
853 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
857 * Verify that when an AppendEntries is sent to a follower during a snapshot install
858 * the Follower short-circuits the processing of the AppendEntries message.
861 public void testReceivingAppendEntriesDuringInstallSnapshot() {
862 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
864 MockRaftActorContext context = createActorContext();
866 follower = createBehavior(context);
868 ByteString bsSnapshot = createSnapshot();
869 int snapshotLength = bsSnapshot.size();
871 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
872 int lastIncludedIndex = 1;
874 // Check that snapshot installation is not in progress
875 assertNull(follower.getSnapshotTracker());
877 // Make sure that we have more than 1 chunk to send
878 assertTrue(totalChunks > 1);
880 // Send an install snapshot with the first chunk to start the process of installing a snapshot
881 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
882 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
883 chunkData, 1, totalChunks));
885 // Check if snapshot installation is in progress now
886 assertNotNull(follower.getSnapshotTracker());
888 // Send an append entry
889 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
890 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
892 follower.handleMessage(leaderActor, appendEntries);
894 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
895 assertEquals("isSuccess", true, reply.isSuccess());
896 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
897 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
898 assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
900 assertNotNull(follower.getSnapshotTracker());
904 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
905 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
907 MockRaftActorContext context = createActorContext();
909 follower = createBehavior(context);
911 ByteString bsSnapshot = createSnapshot();
912 int snapshotLength = bsSnapshot.size();
914 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
915 int lastIncludedIndex = 1;
917 // Check that snapshot installation is not in progress
918 assertNull(follower.getSnapshotTracker());
920 // Make sure that we have more than 1 chunk to send
921 assertTrue(totalChunks > 1);
923 // Send an install snapshot with the first chunk to start the process of installing a snapshot
924 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
925 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
926 chunkData, 1, totalChunks));
928 // Check if snapshot installation is in progress now
929 assertNotNull(follower.getSnapshotTracker());
931 // Send appendEntries with a new term and leader.
932 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
933 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
935 follower.handleMessage(leaderActor, appendEntries);
937 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
938 assertEquals("isSuccess", true, reply.isSuccess());
939 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
940 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
941 assertEquals("getTerm", 2, reply.getTerm());
943 assertNull(follower.getSnapshotTracker());
947 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
948 logStart("testInitialSyncUpWithHandleInstallSnapshot");
950 MockRaftActorContext context = createActorContext();
951 context.setCommitIndex(-1);
953 follower = createBehavior(context);
955 ByteString bsSnapshot = createSnapshot();
957 int snapshotLength = bsSnapshot.size();
959 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
960 int lastIncludedIndex = 1;
962 InstallSnapshot lastInstallSnapshot = null;
964 for (int i = 0; i < totalChunks; i++) {
965 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
966 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
967 chunkData, chunkIndex, totalChunks);
968 follower.handleMessage(leaderActor, lastInstallSnapshot);
969 offset = offset + 50;
974 FollowerInitialSyncUpStatus syncStatus =
975 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
977 assertFalse(syncStatus.isInitialSyncDone());
979 // Clear all the messages
980 followerActor.underlyingActor().clear();
982 context.setLastApplied(101);
983 context.setCommitIndex(101);
984 setLastLogEntry(context, 1, 101,
985 new MockRaftActorContext.MockPayload(""));
987 List<ReplicatedLogEntry> entries = Arrays.asList(
988 newReplicatedLogEntry(2, 101, "foo"));
990 // The new commitIndex is 101
991 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
992 follower.handleMessage(leaderActor, appendEntries);
994 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
996 assertTrue(syncStatus.isInitialSyncDone());
1000 public void testHandleOutOfSequenceInstallSnapshot() {
1001 logStart("testHandleOutOfSequenceInstallSnapshot");
1003 MockRaftActorContext context = createActorContext();
1005 follower = createBehavior(context);
1007 ByteString bsSnapshot = createSnapshot();
1009 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1010 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1011 follower.handleMessage(leaderActor, installSnapshot);
1013 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1014 InstallSnapshotReply.class);
1016 assertEquals("isSuccess", false, reply.isSuccess());
1017 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1018 assertEquals("getTerm", 1, reply.getTerm());
1019 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1021 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1025 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1026 MockRaftActorContext context = createActorContext();
1028 Stopwatch stopwatch = Stopwatch.createStarted();
1030 follower = createBehavior(context);
1032 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1034 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1036 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1038 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1039 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1043 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1044 MockRaftActorContext context = createActorContext();
1045 context.setConfigParams(new DefaultConfigParamsImpl() {
1047 public FiniteDuration getElectionTimeOutInterval() {
1048 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1052 context.setRaftPolicy(createRaftPolicy(false, false));
1054 follower = createBehavior(context);
1056 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1057 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1058 assertSame("handleMessage result", follower, newBehavior);
1062 public void testFollowerSchedulesElectionIfNonVoting() {
1063 MockRaftActorContext context = createActorContext();
1064 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1065 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1066 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1067 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1069 follower = new Follower(context, "leader", (short)1);
1071 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1072 ElectionTimeout.class);
1073 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1074 assertSame("handleMessage result", follower, newBehavior);
1075 assertNull("Expected null leaderId", follower.getLeaderId());
1079 public void testElectionScheduledWhenAnyRaftRPCReceived() {
1080 MockRaftActorContext context = createActorContext();
1081 follower = createBehavior(context);
1082 follower.handleMessage(leaderActor, new RaftRPC() {
1083 private static final long serialVersionUID = 1L;
1086 public long getTerm() {
1090 verify(follower).scheduleElection(any(FiniteDuration.class));
1094 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1095 MockRaftActorContext context = createActorContext();
1096 follower = createBehavior(context);
1097 follower.handleMessage(leaderActor, "non-raft-rpc");
1098 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1102 public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1103 String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1106 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1108 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1109 config.setSnapshotBatchCount(2);
1110 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1112 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1113 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1114 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1115 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1116 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1117 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1118 followerRaftActor.set(followerActorRef.underlyingActor());
1119 followerRaftActor.get().waitForInitializeBehaviorComplete();
1121 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1122 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1123 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1125 List<ReplicatedLogEntry> entries = Arrays.asList(
1126 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1128 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1130 followerActorRef.tell(appendEntries, leaderActor);
1132 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1133 assertEquals("isSuccess", true, reply.isSuccess());
1135 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1137 InMemoryJournal.waitForDeleteMessagesComplete(id);
1138 InMemoryJournal.waitForWriteMessagesComplete(id);
1139 // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1140 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1141 // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1142 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1143 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1144 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1145 assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1147 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1148 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1149 assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1150 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1151 assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1152 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1153 MockRaftActor.fromState(snapshot.getState()));
1157 public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1158 String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1161 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1163 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1164 config.setSnapshotBatchCount(2);
1165 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1167 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1168 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1169 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1170 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1171 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1172 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1173 followerRaftActor.set(followerActorRef.underlyingActor());
1174 followerRaftActor.get().waitForInitializeBehaviorComplete();
1176 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1177 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1178 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1180 List<ReplicatedLogEntry> entries = Arrays.asList(
1181 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1182 newReplicatedLogEntry(1, 2, "three"));
1184 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1186 followerActorRef.tell(appendEntries, leaderActor);
1188 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1189 assertEquals("isSuccess", true, reply.isSuccess());
1191 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1193 InMemoryJournal.waitForDeleteMessagesComplete(id);
1194 InMemoryJournal.waitForWriteMessagesComplete(id);
1195 // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1196 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1197 // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1198 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1199 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1200 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1201 assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1203 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1204 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1205 assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1206 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1207 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1208 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1209 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1211 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1212 assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1214 // Reinstate the actor from persistence
1216 actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
1218 followerActorRef = actorFactory.createTestActor(builder.props()
1219 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1220 followerRaftActor.set(followerActorRef.underlyingActor());
1221 followerRaftActor.get().waitForInitializeBehaviorComplete();
1223 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1224 assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1225 assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1226 assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1227 assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1228 entries.get(2).getData()), followerRaftActor.get().getState());
1232 public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1233 String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1236 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1238 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1239 config.setSnapshotBatchCount(1);
1240 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1242 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1243 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1244 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1245 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1246 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1247 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1248 followerRaftActor.set(followerActorRef.underlyingActor());
1249 followerRaftActor.get().waitForInitializeBehaviorComplete();
1251 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1252 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1253 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1255 List<ReplicatedLogEntry> entries = Arrays.asList(
1256 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1257 newReplicatedLogEntry(1, 2, "three"));
1259 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1261 followerActorRef.tell(appendEntries, leaderActor);
1263 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1264 assertEquals("isSuccess", true, reply.isSuccess());
1266 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1268 InMemoryJournal.waitForDeleteMessagesComplete(id);
1269 InMemoryJournal.waitForWriteMessagesComplete(id);
1270 // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1271 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1272 // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1273 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1274 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1275 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1276 assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1278 assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1279 assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1280 assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1281 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1282 assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1283 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1284 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1285 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1286 MockRaftActor.fromState(snapshot.getState()));
1289 @SuppressWarnings("checkstyle:IllegalCatch")
1290 private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1291 final AtomicReference<MockRaftActor> followerRaftActor) {
1292 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1294 public void createSnapshot(final ActorRef actorRef,
1295 final java.util.Optional<OutputStream> installSnapshotStream) {
1297 actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1298 installSnapshotStream), actorRef);
1299 } catch (RuntimeException e) {
1301 } catch (Exception e) {
1302 throw new RuntimeException(e);
1307 public void applySnapshot(final State snapshotState) {
1311 public State deserializeSnapshot(final ByteSource snapshotBytes) {
1312 throw new UnsupportedOperationException();
1315 return snapshotCohort;
1318 public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1319 int snapshotLength = bs.size();
1321 int size = chunkSize;
1322 if (chunkSize > snapshotLength) {
1323 size = snapshotLength;
1325 if (start + chunkSize > snapshotLength) {
1326 size = snapshotLength - start;
1330 byte[] nextChunk = new byte[size];
1331 bs.copyTo(nextChunk, start, 0, size);
1335 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1336 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1337 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1340 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1341 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1342 final boolean expForceInstallSnapshot) {
1344 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1345 AppendEntriesReply.class);
1347 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1348 assertEquals("getTerm", expTerm, reply.getTerm());
1349 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1350 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1351 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1352 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1353 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1357 private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1358 return new SimpleReplicatedLogEntry(index, term,
1359 new MockRaftActorContext.MockPayload(data));
1362 private ByteString createSnapshot() {
1363 HashMap<String, String> followerSnapshot = new HashMap<>();
1364 followerSnapshot.put("1", "A");
1365 followerSnapshot.put("2", "B");
1366 followerSnapshot.put("3", "C");
1368 return toByteString(followerSnapshot);
1372 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1373 final ActorRef actorRef, final RaftRPC rpc) {
1374 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1376 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1377 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1381 protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor) {
1382 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1383 assertEquals("isSuccess", true, reply.isSuccess());