2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.verify;
23 import akka.actor.ActorRef;
24 import akka.dispatch.Dispatchers;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.base.Optional;
29 import com.google.common.base.Stopwatch;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.io.ByteSource;
33 import com.google.common.util.concurrent.Uninterruptibles;
34 import java.io.OutputStream;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Assert;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
48 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
51 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
52 import org.opendaylight.controller.cluster.raft.RaftActorContext;
53 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
54 import org.opendaylight.controller.cluster.raft.RaftVersions;
55 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
56 import org.opendaylight.controller.cluster.raft.VotingState;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
60 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
61 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
65 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
67 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
68 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
69 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
70 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
71 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
72 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
73 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
74 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
75 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
76 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
77 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
78 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
79 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
80 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
81 import scala.concurrent.duration.FiniteDuration;
83 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
85 private final ActorRef followerActor = actorFactory.createActor(
86 MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
88 private final ActorRef leaderActor = actorFactory.createActor(
89 MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
91 private Follower follower;
93 private final short payloadVersion = 5;
97 public void tearDown() {
98 if (follower != null) {
106 protected Follower createBehavior(final RaftActorContext actorContext) {
107 return spy(new Follower(actorContext));
111 protected MockRaftActorContext createActorContext() {
112 return createActorContext(followerActor);
116 protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
117 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
118 context.setPayloadVersion(payloadVersion);
119 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
120 peerId -> leaderActor.path().toString());
125 public void testThatAnElectionTimeoutIsTriggered() {
126 MockRaftActorContext actorContext = createActorContext();
127 follower = new Follower(actorContext);
129 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
130 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
134 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
135 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
137 MockRaftActorContext context = createActorContext();
138 follower = new Follower(context);
140 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
141 TimeUnit.MILLISECONDS);
142 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
144 assertTrue(raftBehavior instanceof Candidate);
148 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
149 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
151 MockRaftActorContext context = createActorContext();
152 ((DefaultConfigParamsImpl) context.getConfigParams())
153 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
154 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
156 follower = new Follower(context);
157 context.setCurrentBehavior(follower);
159 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
160 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
161 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
164 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
165 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
166 assertTrue(raftBehavior instanceof Follower);
168 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
169 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
170 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
173 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
174 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
175 assertTrue(raftBehavior instanceof Follower);
179 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
180 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
182 MockRaftActorContext context = createActorContext();
184 context.getTermInformation().update(term, null);
186 follower = createBehavior(context);
188 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
190 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
192 assertEquals("isVoteGranted", true, reply.isVoteGranted());
193 assertEquals("getTerm", term, reply.getTerm());
194 verify(follower).scheduleElection(any(FiniteDuration.class));
198 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
199 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
201 MockRaftActorContext context = createActorContext();
203 context.getTermInformation().update(term, "test");
205 follower = createBehavior(context);
207 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
209 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
211 assertEquals("isVoteGranted", false, reply.isVoteGranted());
212 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
217 public void testHandleFirstAppendEntries() {
218 logStart("testHandleFirstAppendEntries");
220 MockRaftActorContext context = createActorContext();
221 context.getReplicatedLog().clear(0,2);
222 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
223 context.getReplicatedLog().setSnapshotIndex(99);
225 List<ReplicatedLogEntry> entries = Arrays.asList(
226 newReplicatedLogEntry(2, 101, "foo"));
228 Assert.assertEquals(1, context.getReplicatedLog().size());
230 // The new commitIndex is 101
231 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
233 follower = createBehavior(context);
234 follower.handleMessage(leaderActor, appendEntries);
236 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
237 FollowerInitialSyncUpStatus.class);
238 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
240 assertFalse(syncStatus.isInitialSyncDone());
241 assertTrue("append entries reply should be true", reply.isSuccess());
245 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
246 logStart("testHandleFirstAppendEntries");
248 MockRaftActorContext context = createActorContext();
250 List<ReplicatedLogEntry> entries = Arrays.asList(
251 newReplicatedLogEntry(2, 101, "foo"));
253 // The new commitIndex is 101
254 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
256 follower = createBehavior(context);
257 follower.handleMessage(leaderActor, appendEntries);
259 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
260 FollowerInitialSyncUpStatus.class);
261 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
263 assertFalse(syncStatus.isInitialSyncDone());
264 assertFalse("append entries reply should be false", reply.isSuccess());
268 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
269 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
271 MockRaftActorContext context = createActorContext();
272 context.getReplicatedLog().clear(0,2);
273 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
274 context.getReplicatedLog().setSnapshotIndex(99);
276 List<ReplicatedLogEntry> entries = Arrays.asList(
277 newReplicatedLogEntry(2, 101, "foo"));
279 // The new commitIndex is 101
280 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
282 follower = createBehavior(context);
283 follower.handleMessage(leaderActor, appendEntries);
285 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
286 FollowerInitialSyncUpStatus.class);
287 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
289 assertFalse(syncStatus.isInitialSyncDone());
290 assertTrue("append entries reply should be true", reply.isSuccess());
294 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
295 logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
297 MockRaftActorContext context = createActorContext();
298 context.getReplicatedLog().clear(0,2);
299 context.getReplicatedLog().setSnapshotIndex(100);
301 List<ReplicatedLogEntry> entries = Arrays.asList(
302 newReplicatedLogEntry(2, 101, "foo"));
304 // The new commitIndex is 101
305 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
307 follower = createBehavior(context);
308 follower.handleMessage(leaderActor, appendEntries);
310 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
311 FollowerInitialSyncUpStatus.class);
312 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
314 assertFalse(syncStatus.isInitialSyncDone());
315 assertTrue("append entries reply should be true", reply.isSuccess());
319 public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
321 "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
323 MockRaftActorContext context = createActorContext();
324 context.getReplicatedLog().clear(0,2);
325 context.getReplicatedLog().setSnapshotIndex(100);
327 List<ReplicatedLogEntry> entries = Arrays.asList(
328 newReplicatedLogEntry(2, 105, "foo"));
330 // The new commitIndex is 101
331 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
333 follower = createBehavior(context);
334 follower.handleMessage(leaderActor, appendEntries);
336 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
337 FollowerInitialSyncUpStatus.class);
338 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
340 assertFalse(syncStatus.isInitialSyncDone());
341 assertFalse("append entries reply should be false", reply.isSuccess());
345 public void testHandleSyncUpAppendEntries() {
346 logStart("testHandleSyncUpAppendEntries");
348 MockRaftActorContext context = createActorContext();
350 List<ReplicatedLogEntry> entries = Arrays.asList(
351 newReplicatedLogEntry(2, 101, "foo"));
353 // The new commitIndex is 101
354 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
356 follower = createBehavior(context);
357 follower.handleMessage(leaderActor, appendEntries);
359 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
360 FollowerInitialSyncUpStatus.class);
362 assertFalse(syncStatus.isInitialSyncDone());
364 // Clear all the messages
365 MessageCollectorActor.clearMessages(followerActor);
367 context.setLastApplied(101);
368 context.setCommitIndex(101);
369 setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
371 entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
373 // The new commitIndex is 101
374 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
375 follower.handleMessage(leaderActor, appendEntries);
377 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
379 assertTrue(syncStatus.isInitialSyncDone());
381 MessageCollectorActor.clearMessages(followerActor);
383 // Sending the same message again should not generate another message
385 follower.handleMessage(leaderActor, appendEntries);
387 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
389 assertNull(syncStatus);
393 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
394 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
396 MockRaftActorContext context = createActorContext();
398 List<ReplicatedLogEntry> entries = Arrays.asList(
399 newReplicatedLogEntry(2, 101, "foo"));
401 // The new commitIndex is 101
402 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
404 follower = createBehavior(context);
405 follower.handleMessage(leaderActor, appendEntries);
407 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
408 FollowerInitialSyncUpStatus.class);
410 assertFalse(syncStatus.isInitialSyncDone());
412 // Clear all the messages
413 MessageCollectorActor.clearMessages(followerActor);
415 context.setLastApplied(100);
416 setLastLogEntry(context, 1, 100,
417 new MockRaftActorContext.MockPayload(""));
419 entries = Arrays.asList(
420 newReplicatedLogEntry(2, 101, "foo"));
422 // leader-2 is becoming the leader now and it says the commitIndex is 45
423 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
424 follower.handleMessage(leaderActor, appendEntries);
426 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
428 // We get a new message saying initial status is not done
429 assertFalse(syncStatus.isInitialSyncDone());
433 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
434 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
436 MockRaftActorContext context = createActorContext();
438 List<ReplicatedLogEntry> entries = Arrays.asList(
439 newReplicatedLogEntry(2, 101, "foo"));
441 // The new commitIndex is 101
442 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
444 follower = createBehavior(context);
445 follower.handleMessage(leaderActor, appendEntries);
447 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
448 FollowerInitialSyncUpStatus.class);
450 assertFalse(syncStatus.isInitialSyncDone());
452 // Clear all the messages
453 MessageCollectorActor.clearMessages(followerActor);
455 context.setLastApplied(101);
456 context.setCommitIndex(101);
457 setLastLogEntry(context, 1, 101,
458 new MockRaftActorContext.MockPayload(""));
460 entries = Arrays.asList(
461 newReplicatedLogEntry(2, 101, "foo"));
463 // The new commitIndex is 101
464 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
465 follower.handleMessage(leaderActor, appendEntries);
467 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
469 assertTrue(syncStatus.isInitialSyncDone());
471 // Clear all the messages
472 MessageCollectorActor.clearMessages(followerActor);
474 context.setLastApplied(100);
475 setLastLogEntry(context, 1, 100,
476 new MockRaftActorContext.MockPayload(""));
478 entries = Arrays.asList(
479 newReplicatedLogEntry(2, 101, "foo"));
481 // leader-2 is becoming the leader now and it says the commitIndex is 45
482 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
483 follower.handleMessage(leaderActor, appendEntries);
485 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
487 // We get a new message saying initial status is not done
488 assertFalse(syncStatus.isInitialSyncDone());
492 * This test verifies that when an AppendEntries RPC is received by a RaftActor
493 * with a commitIndex that is greater than what has been applied to the
494 * state machine of the RaftActor, the RaftActor applies the state and
495 * sets it current applied state to the commitIndex of the sender.
498 public void testHandleAppendEntriesWithNewerCommitIndex() {
499 logStart("testHandleAppendEntriesWithNewerCommitIndex");
501 MockRaftActorContext context = createActorContext();
503 context.setLastApplied(100);
504 setLastLogEntry(context, 1, 100,
505 new MockRaftActorContext.MockPayload(""));
506 context.getReplicatedLog().setSnapshotIndex(99);
508 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
509 newReplicatedLogEntry(2, 101, "foo"));
511 // The new commitIndex is 101
512 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
514 follower = createBehavior(context);
515 follower.handleMessage(leaderActor, appendEntries);
517 assertEquals("getLastApplied", 101L, context.getLastApplied());
521 * This test verifies that when an AppendEntries is received with a prevLogTerm
522 * which does not match the term that is in RaftActors log entry at prevLogIndex
523 * then the RaftActor does not change it's state and it returns a failure.
526 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
527 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
529 MockRaftActorContext context = createActorContext();
531 AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
533 follower = createBehavior(context);
535 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
537 Assert.assertSame(follower, newBehavior);
539 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
540 AppendEntriesReply.class);
542 assertEquals("isSuccess", false, reply.isSuccess());
546 public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
547 logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
549 MockRaftActorContext context = createActorContext();
550 context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
551 context.getReplicatedLog().setSnapshotIndex(4);
552 context.getReplicatedLog().setSnapshotTerm(3);
554 AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
556 follower = createBehavior(context);
558 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
560 Assert.assertSame(follower, newBehavior);
562 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
564 assertEquals("isSuccess", true, reply.isSuccess());
568 * This test verifies that when a new AppendEntries message is received with
569 * new entries and the logs of the sender and receiver match that the new
570 * entries get added to the log and the log is incremented by the number of
571 * entries received in appendEntries.
574 public void testHandleAppendEntriesAddNewEntries() {
575 logStart("testHandleAppendEntriesAddNewEntries");
577 MockRaftActorContext context = createActorContext();
579 // First set the receivers term to lower number
580 context.getTermInformation().update(1, "test");
582 // Prepare the receivers log
583 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
584 log.append(newReplicatedLogEntry(1, 0, "zero"));
585 log.append(newReplicatedLogEntry(1, 1, "one"));
586 log.append(newReplicatedLogEntry(1, 2, "two"));
588 context.setReplicatedLog(log);
590 // Prepare the entries to be sent with AppendEntries
591 List<ReplicatedLogEntry> entries = new ArrayList<>();
592 entries.add(newReplicatedLogEntry(1, 3, "three"));
593 entries.add(newReplicatedLogEntry(1, 4, "four"));
595 // Send appendEntries with the same term as was set on the receiver
596 // before the new behavior was created (1 in this case)
597 // This will not work for a Candidate because as soon as a Candidate
598 // is created it increments the term
599 short leaderPayloadVersion = 10;
600 String leaderId = "leader-1";
601 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
603 follower = createBehavior(context);
605 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
607 Assert.assertSame(follower, newBehavior);
609 assertEquals("Next index", 5, log.last().getIndex() + 1);
610 assertEquals("Entry 3", entries.get(0), log.get(3));
611 assertEquals("Entry 4", entries.get(1), log.get(4));
613 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
614 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
616 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
620 * This test verifies that when a new AppendEntries message is received with
621 * new entries and the logs of the sender and receiver are out-of-sync that
622 * the log is first corrected by removing the out of sync entries from the
623 * log and then adding in the new entries sent with the AppendEntries message.
626 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
627 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
629 MockRaftActorContext context = createActorContext();
631 // First set the receivers term to lower number
632 context.getTermInformation().update(1, "test");
634 // Prepare the receivers log
635 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
636 log.append(newReplicatedLogEntry(1, 0, "zero"));
637 log.append(newReplicatedLogEntry(1, 1, "one"));
638 log.append(newReplicatedLogEntry(1, 2, "two"));
640 context.setReplicatedLog(log);
642 // Prepare the entries to be sent with AppendEntries
643 List<ReplicatedLogEntry> entries = new ArrayList<>();
644 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
645 entries.add(newReplicatedLogEntry(2, 3, "three"));
647 // Send appendEntries with the same term as was set on the receiver
648 // before the new behavior was created (1 in this case)
649 // This will not work for a Candidate because as soon as a Candidate
650 // is created it increments the term
651 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
653 follower = createBehavior(context);
655 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
657 Assert.assertSame(follower, newBehavior);
659 // The entry at index 2 will be found out-of-sync with the leader
660 // and will be removed
661 // Then the two new entries will be added to the log
662 // Thus making the log to have 4 entries
663 assertEquals("Next index", 4, log.last().getIndex() + 1);
664 //assertEquals("Entry 2", entries.get(0), log.get(2));
666 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
668 // Check that the entry at index 2 has the new data
669 assertEquals("Entry 2", entries.get(0), log.get(2));
671 assertEquals("Entry 3", entries.get(1), log.get(3));
673 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
677 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
678 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
680 MockRaftActorContext context = createActorContext();
682 // First set the receivers term to lower number
683 context.getTermInformation().update(1, "test");
685 // Prepare the receivers log
686 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
687 log.append(newReplicatedLogEntry(1, 0, "zero"));
688 log.append(newReplicatedLogEntry(1, 1, "one"));
689 log.append(newReplicatedLogEntry(1, 2, "two"));
691 context.setReplicatedLog(log);
693 // Prepare the entries to be sent with AppendEntries
694 List<ReplicatedLogEntry> entries = new ArrayList<>();
695 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
696 entries.add(newReplicatedLogEntry(2, 3, "three"));
698 // Send appendEntries with the same term as was set on the receiver
699 // before the new behavior was created (1 in this case)
700 // This will not work for a Candidate because as soon as a Candidate
701 // is created it increments the term
702 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
704 context.setRaftPolicy(createRaftPolicy(false, true));
705 follower = createBehavior(context);
707 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
709 Assert.assertSame(follower, newBehavior);
711 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
715 public void testHandleAppendEntriesPreviousLogEntryMissing() {
716 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
718 final MockRaftActorContext context = createActorContext();
720 // Prepare the receivers log
721 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
722 log.append(newReplicatedLogEntry(1, 0, "zero"));
723 log.append(newReplicatedLogEntry(1, 1, "one"));
724 log.append(newReplicatedLogEntry(1, 2, "two"));
726 context.setReplicatedLog(log);
728 // Prepare the entries to be sent with AppendEntries
729 List<ReplicatedLogEntry> entries = new ArrayList<>();
730 entries.add(newReplicatedLogEntry(1, 4, "four"));
732 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
734 follower = createBehavior(context);
736 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
738 Assert.assertSame(follower, newBehavior);
740 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
744 public void testHandleAppendEntriesWithExistingLogEntry() {
745 logStart("testHandleAppendEntriesWithExistingLogEntry");
747 MockRaftActorContext context = createActorContext();
749 context.getTermInformation().update(1, "test");
751 // Prepare the receivers log
752 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
753 log.append(newReplicatedLogEntry(1, 0, "zero"));
754 log.append(newReplicatedLogEntry(1, 1, "one"));
756 context.setReplicatedLog(log);
758 // Send the last entry again.
759 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
761 follower = createBehavior(context);
763 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
765 assertEquals("Next index", 2, log.last().getIndex() + 1);
766 assertEquals("Entry 1", entries.get(0), log.get(1));
768 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
770 // Send the last entry again and also a new one.
772 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
774 MessageCollectorActor.clearMessages(leaderActor);
775 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
777 assertEquals("Next index", 3, log.last().getIndex() + 1);
778 assertEquals("Entry 1", entries.get(0), log.get(1));
779 assertEquals("Entry 2", entries.get(1), log.get(2));
781 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
785 public void testHandleAppendEntriesAfterInstallingSnapshot() {
786 logStart("testHandleAppendAfterInstallingSnapshot");
788 MockRaftActorContext context = createActorContext();
790 // Prepare the receivers log
791 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
793 // Set up a log as if it has been snapshotted
794 log.setSnapshotIndex(3);
795 log.setSnapshotTerm(1);
797 context.setReplicatedLog(log);
799 // Prepare the entries to be sent with AppendEntries
800 List<ReplicatedLogEntry> entries = new ArrayList<>();
801 entries.add(newReplicatedLogEntry(1, 4, "four"));
803 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
805 follower = createBehavior(context);
807 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
809 Assert.assertSame(follower, newBehavior);
811 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
815 * This test verifies that when InstallSnapshot is received by
816 * the follower its applied correctly.
819 public void testHandleInstallSnapshot() {
820 logStart("testHandleInstallSnapshot");
822 MockRaftActorContext context = createActorContext();
823 context.getTermInformation().update(1, "leader");
825 follower = createBehavior(context);
827 ByteString bsSnapshot = createSnapshot();
829 int snapshotLength = bsSnapshot.size();
831 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
832 int lastIncludedIndex = 1;
834 InstallSnapshot lastInstallSnapshot = null;
836 for (int i = 0; i < totalChunks; i++) {
837 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
838 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
839 chunkData, chunkIndex, totalChunks);
840 follower.handleMessage(leaderActor, lastInstallSnapshot);
841 offset = offset + 50;
846 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
847 ApplySnapshot.class);
848 Snapshot snapshot = applySnapshot.getSnapshot();
849 assertNotNull(lastInstallSnapshot);
850 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
851 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
852 snapshot.getLastAppliedTerm());
853 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
854 snapshot.getLastAppliedIndex());
855 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
856 assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
857 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
858 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
859 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
860 applySnapshot.getCallback().onSuccess();
862 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
863 leaderActor, InstallSnapshotReply.class);
864 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
867 for (InstallSnapshotReply reply: replies) {
868 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
869 assertEquals("getTerm", 1, reply.getTerm());
870 assertEquals("isSuccess", true, reply.isSuccess());
871 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
874 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
878 * Verify that when an AppendEntries is sent to a follower during a snapshot install
879 * the Follower short-circuits the processing of the AppendEntries message.
882 public void testReceivingAppendEntriesDuringInstallSnapshot() {
883 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
885 MockRaftActorContext context = createActorContext();
887 follower = createBehavior(context);
889 ByteString bsSnapshot = createSnapshot();
890 int snapshotLength = bsSnapshot.size();
892 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
893 int lastIncludedIndex = 1;
895 // Check that snapshot installation is not in progress
896 assertNull(follower.getSnapshotTracker());
898 // Make sure that we have more than 1 chunk to send
899 assertTrue(totalChunks > 1);
901 // Send an install snapshot with the first chunk to start the process of installing a snapshot
902 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
903 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
904 chunkData, 1, totalChunks));
906 // Check if snapshot installation is in progress now
907 assertNotNull(follower.getSnapshotTracker());
909 // Send an append entry
910 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
911 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
913 follower.handleMessage(leaderActor, appendEntries);
915 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
916 assertEquals("isSuccess", true, reply.isSuccess());
917 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
918 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
919 assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
921 assertNotNull(follower.getSnapshotTracker());
925 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
926 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
928 MockRaftActorContext context = createActorContext();
930 follower = createBehavior(context);
932 ByteString bsSnapshot = createSnapshot();
933 int snapshotLength = bsSnapshot.size();
935 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
936 int lastIncludedIndex = 1;
938 // Check that snapshot installation is not in progress
939 assertNull(follower.getSnapshotTracker());
941 // Make sure that we have more than 1 chunk to send
942 assertTrue(totalChunks > 1);
944 // Send an install snapshot with the first chunk to start the process of installing a snapshot
945 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
946 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
947 chunkData, 1, totalChunks));
949 // Check if snapshot installation is in progress now
950 assertNotNull(follower.getSnapshotTracker());
952 // Send appendEntries with a new term and leader.
953 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
954 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
956 follower.handleMessage(leaderActor, appendEntries);
958 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
959 assertEquals("isSuccess", true, reply.isSuccess());
960 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
961 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
962 assertEquals("getTerm", 2, reply.getTerm());
964 assertNull(follower.getSnapshotTracker());
968 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
969 logStart("testInitialSyncUpWithHandleInstallSnapshot");
971 MockRaftActorContext context = createActorContext();
972 context.setCommitIndex(-1);
974 follower = createBehavior(context);
976 ByteString bsSnapshot = createSnapshot();
978 int snapshotLength = bsSnapshot.size();
980 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
981 int lastIncludedIndex = 1;
983 InstallSnapshot lastInstallSnapshot = null;
985 for (int i = 0; i < totalChunks; i++) {
986 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
987 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
988 chunkData, chunkIndex, totalChunks);
989 follower.handleMessage(leaderActor, lastInstallSnapshot);
990 offset = offset + 50;
995 FollowerInitialSyncUpStatus syncStatus =
996 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
998 assertFalse(syncStatus.isInitialSyncDone());
1000 // Clear all the messages
1001 MessageCollectorActor.clearMessages(followerActor);
1003 context.setLastApplied(101);
1004 context.setCommitIndex(101);
1005 setLastLogEntry(context, 1, 101,
1006 new MockRaftActorContext.MockPayload(""));
1008 List<ReplicatedLogEntry> entries = Arrays.asList(
1009 newReplicatedLogEntry(2, 101, "foo"));
1011 // The new commitIndex is 101
1012 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
1013 follower.handleMessage(leaderActor, appendEntries);
1015 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
1017 assertTrue(syncStatus.isInitialSyncDone());
1021 public void testHandleOutOfSequenceInstallSnapshot() {
1022 logStart("testHandleOutOfSequenceInstallSnapshot");
1024 MockRaftActorContext context = createActorContext();
1026 follower = createBehavior(context);
1028 ByteString bsSnapshot = createSnapshot();
1030 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1031 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1032 follower.handleMessage(leaderActor, installSnapshot);
1034 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1035 InstallSnapshotReply.class);
1037 assertEquals("isSuccess", false, reply.isSuccess());
1038 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1039 assertEquals("getTerm", 1, reply.getTerm());
1040 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1042 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1046 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1047 MockRaftActorContext context = createActorContext();
1049 Stopwatch stopwatch = Stopwatch.createStarted();
1051 follower = createBehavior(context);
1053 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1055 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1057 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1059 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1060 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1064 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1065 MockRaftActorContext context = createActorContext();
1066 context.setConfigParams(new DefaultConfigParamsImpl() {
1068 public FiniteDuration getElectionTimeOutInterval() {
1069 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1073 context.setRaftPolicy(createRaftPolicy(false, false));
1075 follower = createBehavior(context);
1077 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1078 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1079 assertSame("handleMessage result", follower, newBehavior);
1083 public void testFollowerSchedulesElectionIfNonVoting() {
1084 MockRaftActorContext context = createActorContext();
1085 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1086 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1087 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1088 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1090 follower = new Follower(context, "leader", (short)1);
1092 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1093 ElectionTimeout.class);
1094 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1095 assertSame("handleMessage result", follower, newBehavior);
1096 assertNull("Expected null leaderId", follower.getLeaderId());
1100 public void testElectionScheduledWhenAnyRaftRPCReceived() {
1101 MockRaftActorContext context = createActorContext();
1102 follower = createBehavior(context);
1103 follower.handleMessage(leaderActor, new RaftRPC() {
1104 private static final long serialVersionUID = 1L;
1107 public long getTerm() {
1111 verify(follower).scheduleElection(any(FiniteDuration.class));
1115 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1116 MockRaftActorContext context = createActorContext();
1117 follower = createBehavior(context);
1118 follower.handleMessage(leaderActor, "non-raft-rpc");
1119 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1123 public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1124 String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1127 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1129 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1130 config.setSnapshotBatchCount(2);
1131 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1133 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1134 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1135 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1136 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1137 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1138 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1139 followerRaftActor.set(followerActorRef.underlyingActor());
1140 followerRaftActor.get().waitForInitializeBehaviorComplete();
1142 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1143 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1144 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1146 List<ReplicatedLogEntry> entries = Arrays.asList(
1147 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1149 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1151 followerActorRef.tell(appendEntries, leaderActor);
1153 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1154 assertEquals("isSuccess", true, reply.isSuccess());
1156 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1158 InMemoryJournal.waitForDeleteMessagesComplete(id);
1159 InMemoryJournal.waitForWriteMessagesComplete(id);
1160 // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1161 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1162 // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1163 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1164 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1165 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1166 assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1168 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1169 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1170 assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1171 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1172 assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1173 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1174 MockRaftActor.fromState(snapshot.getState()));
1178 public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1179 String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1182 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1184 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1185 config.setSnapshotBatchCount(2);
1186 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1188 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1189 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1190 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1191 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1192 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1193 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1194 followerRaftActor.set(followerActorRef.underlyingActor());
1195 followerRaftActor.get().waitForInitializeBehaviorComplete();
1197 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1198 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1199 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1201 List<ReplicatedLogEntry> entries = Arrays.asList(
1202 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1203 newReplicatedLogEntry(1, 2, "three"));
1205 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1207 followerActorRef.tell(appendEntries, leaderActor);
1209 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1210 assertEquals("isSuccess", true, reply.isSuccess());
1212 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1214 InMemoryJournal.waitForDeleteMessagesComplete(id);
1215 InMemoryJournal.waitForWriteMessagesComplete(id);
1216 // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1217 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1218 // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1219 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1220 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1221 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1222 assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1224 assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1225 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1226 assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1227 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1228 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1229 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1230 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1232 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1233 assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1235 // Reinstate the actor from persistence
1237 actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1239 followerActorRef = actorFactory.createTestActor(builder.props()
1240 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1241 followerRaftActor.set(followerActorRef.underlyingActor());
1242 followerRaftActor.get().waitForInitializeBehaviorComplete();
1244 assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1245 assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1246 assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1247 assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1248 assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1249 entries.get(2).getData()), followerRaftActor.get().getState());
1253 public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1254 String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1257 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1259 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1260 config.setSnapshotBatchCount(1);
1261 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1263 final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1264 RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1265 Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1266 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1267 TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1268 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1269 followerRaftActor.set(followerActorRef.underlyingActor());
1270 followerRaftActor.get().waitForInitializeBehaviorComplete();
1272 InMemorySnapshotStore.addSnapshotSavedLatch(id);
1273 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1274 InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1276 List<ReplicatedLogEntry> entries = Arrays.asList(
1277 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1278 newReplicatedLogEntry(1, 2, "three"));
1280 AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1282 followerActorRef.tell(appendEntries, leaderActor);
1284 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1285 assertEquals("isSuccess", true, reply.isSuccess());
1287 final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1289 InMemoryJournal.waitForDeleteMessagesComplete(id);
1290 InMemoryJournal.waitForWriteMessagesComplete(id);
1291 // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1292 // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1293 // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1294 List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1295 assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1296 assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1297 assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1299 assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1300 assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1301 assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1302 assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1303 assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1304 assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1305 assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1306 assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1307 MockRaftActor.fromState(snapshot.getState()));
1311 public void testNeedsLeaderAddress() {
1312 logStart("testNeedsLeaderAddress");
1314 MockRaftActorContext context = createActorContext();
1315 context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
1316 context.addToPeers("leader", null, VotingState.VOTING);
1317 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
1319 follower = createBehavior(context);
1321 follower.handleMessage(leaderActor,
1322 new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, (short)0));
1324 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1325 assertTrue(reply.isNeedsLeaderAddress());
1326 MessageCollectorActor.clearMessages(leaderActor);
1328 PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
1329 ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
1331 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1,
1332 (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
1334 reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1335 assertFalse(reply.isNeedsLeaderAddress());
1337 verify(mockResolver).setResolved("leader", leaderActor.path().toString());
1340 @SuppressWarnings("checkstyle:IllegalCatch")
1341 private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1342 final AtomicReference<MockRaftActor> followerRaftActor) {
1343 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1345 public void createSnapshot(final ActorRef actorRef,
1346 final java.util.Optional<OutputStream> installSnapshotStream) {
1348 actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1349 installSnapshotStream), actorRef);
1350 } catch (RuntimeException e) {
1352 } catch (Exception e) {
1353 throw new RuntimeException(e);
1358 public void applySnapshot(final State snapshotState) {
1362 public State deserializeSnapshot(final ByteSource snapshotBytes) {
1363 throw new UnsupportedOperationException();
1366 return snapshotCohort;
1369 public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1370 int snapshotLength = bs.size();
1372 int size = chunkSize;
1373 if (chunkSize > snapshotLength) {
1374 size = snapshotLength;
1376 if (start + chunkSize > snapshotLength) {
1377 size = snapshotLength - start;
1381 byte[] nextChunk = new byte[size];
1382 bs.copyTo(nextChunk, start, 0, size);
1386 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1387 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1388 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1391 private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1392 final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1393 final boolean expForceInstallSnapshot) {
1395 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1396 AppendEntriesReply.class);
1398 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1399 assertEquals("getTerm", expTerm, reply.getTerm());
1400 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1401 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1402 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1403 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1404 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1405 assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
1409 private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1410 return new SimpleReplicatedLogEntry(index, term,
1411 new MockRaftActorContext.MockPayload(data));
1414 private ByteString createSnapshot() {
1415 HashMap<String, String> followerSnapshot = new HashMap<>();
1416 followerSnapshot.put("1", "A");
1417 followerSnapshot.put("2", "B");
1418 followerSnapshot.put("3", "C");
1420 return toByteString(followerSnapshot);
1424 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1425 final ActorRef actorRef, final RaftRPC rpc) {
1426 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1428 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1429 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1433 protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1434 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1435 assertEquals("isSuccess", true, reply.isSuccess());