2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.Props;
24 import akka.testkit.TestActorRef;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Assert;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
41 import org.opendaylight.controller.cluster.raft.Snapshot;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
44 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
45 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
47 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
48 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
49 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
50 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
51 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
52 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
53 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
54 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
55 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
56 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
57 import scala.concurrent.duration.FiniteDuration;
59 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
61 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
62 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
64 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
65 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
67 private Follower follower;
69 private final short payloadVersion = 5;
73 public void tearDown() throws Exception {
74 if (follower != null) {
82 protected Follower createBehavior(RaftActorContext actorContext) {
83 return spy(new Follower(actorContext));
87 protected MockRaftActorContext createActorContext() {
88 return createActorContext(followerActor);
92 protected MockRaftActorContext createActorContext(ActorRef actorRef) {
93 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
94 context.setPayloadVersion(payloadVersion );
99 public void testThatAnElectionTimeoutIsTriggered() {
100 MockRaftActorContext actorContext = createActorContext();
101 follower = new Follower(actorContext);
103 MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
104 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
108 public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
109 logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
111 MockRaftActorContext context = createActorContext();
112 follower = new Follower(context);
114 Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
115 TimeUnit.MILLISECONDS);
116 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
118 assertTrue(raftBehavior instanceof Candidate);
122 public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
123 logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
125 MockRaftActorContext context = createActorContext();
126 ((DefaultConfigParamsImpl) context.getConfigParams())
127 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
128 ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
130 follower = new Follower(context);
131 context.setCurrentBehavior(follower);
133 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
134 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
135 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
138 Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
139 RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
140 assertTrue(raftBehavior instanceof Follower);
142 Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
143 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
144 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
147 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
148 raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
149 assertTrue(raftBehavior instanceof Follower);
153 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
154 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
156 MockRaftActorContext context = createActorContext();
158 context.getTermInformation().update(term, null);
160 follower = createBehavior(context);
162 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
164 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
166 assertEquals("isVoteGranted", true, reply.isVoteGranted());
167 assertEquals("getTerm", term, reply.getTerm());
168 verify(follower).scheduleElection(any(FiniteDuration.class));
172 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
173 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
175 MockRaftActorContext context = createActorContext();
177 context.getTermInformation().update(term, "test");
179 follower = createBehavior(context);
181 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
183 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
185 assertEquals("isVoteGranted", false, reply.isVoteGranted());
186 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
191 public void testHandleFirstAppendEntries() throws Exception {
192 logStart("testHandleFirstAppendEntries");
194 MockRaftActorContext context = createActorContext();
195 context.getReplicatedLog().clear(0,2);
196 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
197 context.getReplicatedLog().setSnapshotIndex(99);
199 List<ReplicatedLogEntry> entries = Arrays.asList(
200 newReplicatedLogEntry(2, 101, "foo"));
202 Assert.assertEquals(1, context.getReplicatedLog().size());
204 // The new commitIndex is 101
205 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
207 follower = createBehavior(context);
208 follower.handleMessage(leaderActor, appendEntries);
210 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
211 FollowerInitialSyncUpStatus.class);
212 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
214 assertFalse(syncStatus.isInitialSyncDone());
215 assertTrue("append entries reply should be true", reply.isSuccess());
219 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
220 logStart("testHandleFirstAppendEntries");
222 MockRaftActorContext context = createActorContext();
224 List<ReplicatedLogEntry> entries = Arrays.asList(
225 newReplicatedLogEntry(2, 101, "foo"));
227 // The new commitIndex is 101
228 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
230 follower = createBehavior(context);
231 follower.handleMessage(leaderActor, appendEntries);
233 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
234 FollowerInitialSyncUpStatus.class);
235 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
237 assertFalse(syncStatus.isInitialSyncDone());
238 assertFalse("append entries reply should be false", reply.isSuccess());
242 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
244 logStart("testHandleFirstAppendEntries");
246 MockRaftActorContext context = createActorContext();
247 context.getReplicatedLog().clear(0,2);
248 context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
249 context.getReplicatedLog().setSnapshotIndex(99);
251 List<ReplicatedLogEntry> entries = Arrays.asList(
252 newReplicatedLogEntry(2, 101, "foo"));
254 // The new commitIndex is 101
255 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
257 follower = createBehavior(context);
258 follower.handleMessage(leaderActor, appendEntries);
260 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
261 FollowerInitialSyncUpStatus.class);
262 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
264 assertFalse(syncStatus.isInitialSyncDone());
265 assertTrue("append entries reply should be true", reply.isSuccess());
269 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
271 logStart("testHandleFirstAppendEntries");
273 MockRaftActorContext context = createActorContext();
274 context.getReplicatedLog().clear(0,2);
275 context.getReplicatedLog().setSnapshotIndex(100);
277 List<ReplicatedLogEntry> entries = Arrays.asList(
278 newReplicatedLogEntry(2, 101, "foo"));
280 // The new commitIndex is 101
281 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
283 follower = createBehavior(context);
284 follower.handleMessage(leaderActor, appendEntries);
286 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
287 FollowerInitialSyncUpStatus.class);
288 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
290 assertFalse(syncStatus.isInitialSyncDone());
291 assertTrue("append entries reply should be true", reply.isSuccess());
295 public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
298 "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
300 MockRaftActorContext context = createActorContext();
301 context.getReplicatedLog().clear(0,2);
302 context.getReplicatedLog().setSnapshotIndex(100);
304 List<ReplicatedLogEntry> entries = Arrays.asList(
305 newReplicatedLogEntry(2, 105, "foo"));
307 // The new commitIndex is 101
308 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
310 follower = createBehavior(context);
311 follower.handleMessage(leaderActor, appendEntries);
313 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
314 FollowerInitialSyncUpStatus.class);
315 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
317 assertFalse(syncStatus.isInitialSyncDone());
318 assertFalse("append entries reply should be false", reply.isSuccess());
322 public void testHandleSyncUpAppendEntries() throws Exception {
323 logStart("testHandleSyncUpAppendEntries");
325 MockRaftActorContext context = createActorContext();
327 List<ReplicatedLogEntry> entries = Arrays.asList(
328 newReplicatedLogEntry(2, 101, "foo"));
330 // The new commitIndex is 101
331 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
333 follower = createBehavior(context);
334 follower.handleMessage(leaderActor, appendEntries);
336 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
337 FollowerInitialSyncUpStatus.class);
339 assertFalse(syncStatus.isInitialSyncDone());
341 // Clear all the messages
342 followerActor.underlyingActor().clear();
344 context.setLastApplied(101);
345 context.setCommitIndex(101);
346 setLastLogEntry(context, 1, 101,
347 new MockRaftActorContext.MockPayload(""));
349 entries = Arrays.asList(
350 newReplicatedLogEntry(2, 101, "foo"));
352 // The new commitIndex is 101
353 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
354 follower.handleMessage(leaderActor, appendEntries);
356 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
358 assertTrue(syncStatus.isInitialSyncDone());
360 followerActor.underlyingActor().clear();
362 // Sending the same message again should not generate another message
364 follower.handleMessage(leaderActor, appendEntries);
366 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
368 assertNull(syncStatus);
373 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
374 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
376 MockRaftActorContext context = createActorContext();
378 List<ReplicatedLogEntry> entries = Arrays.asList(
379 newReplicatedLogEntry(2, 101, "foo"));
381 // The new commitIndex is 101
382 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
384 follower = createBehavior(context);
385 follower.handleMessage(leaderActor, appendEntries);
387 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
388 FollowerInitialSyncUpStatus.class);
390 assertFalse(syncStatus.isInitialSyncDone());
392 // Clear all the messages
393 followerActor.underlyingActor().clear();
395 context.setLastApplied(100);
396 setLastLogEntry(context, 1, 100,
397 new MockRaftActorContext.MockPayload(""));
399 entries = Arrays.asList(
400 newReplicatedLogEntry(2, 101, "foo"));
402 // leader-2 is becoming the leader now and it says the commitIndex is 45
403 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
404 follower.handleMessage(leaderActor, appendEntries);
406 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
408 // We get a new message saying initial status is not done
409 assertFalse(syncStatus.isInitialSyncDone());
415 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
416 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
418 MockRaftActorContext context = createActorContext();
420 List<ReplicatedLogEntry> entries = Arrays.asList(
421 newReplicatedLogEntry(2, 101, "foo"));
423 // The new commitIndex is 101
424 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
426 follower = createBehavior(context);
427 follower.handleMessage(leaderActor, appendEntries);
429 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
430 FollowerInitialSyncUpStatus.class);
432 assertFalse(syncStatus.isInitialSyncDone());
434 // Clear all the messages
435 followerActor.underlyingActor().clear();
437 context.setLastApplied(101);
438 context.setCommitIndex(101);
439 setLastLogEntry(context, 1, 101,
440 new MockRaftActorContext.MockPayload(""));
442 entries = Arrays.asList(
443 newReplicatedLogEntry(2, 101, "foo"));
445 // The new commitIndex is 101
446 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
447 follower.handleMessage(leaderActor, appendEntries);
449 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
451 assertTrue(syncStatus.isInitialSyncDone());
453 // Clear all the messages
454 followerActor.underlyingActor().clear();
456 context.setLastApplied(100);
457 setLastLogEntry(context, 1, 100,
458 new MockRaftActorContext.MockPayload(""));
460 entries = Arrays.asList(
461 newReplicatedLogEntry(2, 101, "foo"));
463 // leader-2 is becoming the leader now and it says the commitIndex is 45
464 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
465 follower.handleMessage(leaderActor, appendEntries);
467 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
469 // We get a new message saying initial status is not done
470 assertFalse(syncStatus.isInitialSyncDone());
476 * This test verifies that when an AppendEntries RPC is received by a RaftActor
477 * with a commitIndex that is greater than what has been applied to the
478 * state machine of the RaftActor, the RaftActor applies the state and
479 * sets it current applied state to the commitIndex of the sender.
482 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
483 logStart("testHandleAppendEntriesWithNewerCommitIndex");
485 MockRaftActorContext context = createActorContext();
487 context.setLastApplied(100);
488 setLastLogEntry(context, 1, 100,
489 new MockRaftActorContext.MockPayload(""));
490 context.getReplicatedLog().setSnapshotIndex(99);
492 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
493 newReplicatedLogEntry(2, 101, "foo"));
495 // The new commitIndex is 101
496 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
498 follower = createBehavior(context);
499 follower.handleMessage(leaderActor, appendEntries);
501 assertEquals("getLastApplied", 101L, context.getLastApplied());
505 * This test verifies that when an AppendEntries is received a specific prevLogTerm
506 * which does not match the term that is in RaftActors log entry at prevLogIndex
507 * then the RaftActor does not change it's state and it returns a failure.
510 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
511 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
513 MockRaftActorContext context = createActorContext();
515 // First set the receivers term to lower number
516 context.getTermInformation().update(95, "test");
518 // AppendEntries is now sent with a bigger term
519 // this will set the receivers term to be the same as the sender's term
520 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
522 follower = createBehavior(context);
524 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
526 Assert.assertSame(follower, newBehavior);
528 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
529 AppendEntriesReply.class);
531 assertEquals("isSuccess", false, reply.isSuccess());
535 * This test verifies that when a new AppendEntries message is received with
536 * new entries and the logs of the sender and receiver match that the new
537 * entries get added to the log and the log is incremented by the number of
538 * entries received in appendEntries.
541 public void testHandleAppendEntriesAddNewEntries() {
542 logStart("testHandleAppendEntriesAddNewEntries");
544 MockRaftActorContext context = createActorContext();
546 // First set the receivers term to lower number
547 context.getTermInformation().update(1, "test");
549 // Prepare the receivers log
550 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
551 log.append(newReplicatedLogEntry(1, 0, "zero"));
552 log.append(newReplicatedLogEntry(1, 1, "one"));
553 log.append(newReplicatedLogEntry(1, 2, "two"));
555 context.setReplicatedLog(log);
557 // Prepare the entries to be sent with AppendEntries
558 List<ReplicatedLogEntry> entries = new ArrayList<>();
559 entries.add(newReplicatedLogEntry(1, 3, "three"));
560 entries.add(newReplicatedLogEntry(1, 4, "four"));
562 // Send appendEntries with the same term as was set on the receiver
563 // before the new behavior was created (1 in this case)
564 // This will not work for a Candidate because as soon as a Candidate
565 // is created it increments the term
566 short leaderPayloadVersion = 10;
567 String leaderId = "leader-1";
568 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
570 follower = createBehavior(context);
572 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
574 Assert.assertSame(follower, newBehavior);
576 assertEquals("Next index", 5, log.last().getIndex() + 1);
577 assertEquals("Entry 3", entries.get(0), log.get(3));
578 assertEquals("Entry 4", entries.get(1), log.get(4));
580 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
581 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
583 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
587 * This test verifies that when a new AppendEntries message is received with
588 * new entries and the logs of the sender and receiver are out-of-sync that
589 * the log is first corrected by removing the out of sync entries from the
590 * log and then adding in the new entries sent with the AppendEntries message.
593 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
594 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
596 MockRaftActorContext context = createActorContext();
598 // First set the receivers term to lower number
599 context.getTermInformation().update(1, "test");
601 // Prepare the receivers log
602 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
603 log.append(newReplicatedLogEntry(1, 0, "zero"));
604 log.append(newReplicatedLogEntry(1, 1, "one"));
605 log.append(newReplicatedLogEntry(1, 2, "two"));
607 context.setReplicatedLog(log);
609 // Prepare the entries to be sent with AppendEntries
610 List<ReplicatedLogEntry> entries = new ArrayList<>();
611 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
612 entries.add(newReplicatedLogEntry(2, 3, "three"));
614 // Send appendEntries with the same term as was set on the receiver
615 // before the new behavior was created (1 in this case)
616 // This will not work for a Candidate because as soon as a Candidate
617 // is created it increments the term
618 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
620 follower = createBehavior(context);
622 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
624 Assert.assertSame(follower, newBehavior);
626 // The entry at index 2 will be found out-of-sync with the leader
627 // and will be removed
628 // Then the two new entries will be added to the log
629 // Thus making the log to have 4 entries
630 assertEquals("Next index", 4, log.last().getIndex() + 1);
631 //assertEquals("Entry 2", entries.get(0), log.get(2));
633 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
635 // Check that the entry at index 2 has the new data
636 assertEquals("Entry 2", entries.get(0), log.get(2));
638 assertEquals("Entry 3", entries.get(1), log.get(3));
640 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
644 public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
645 logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
647 MockRaftActorContext context = createActorContext();
649 // First set the receivers term to lower number
650 context.getTermInformation().update(1, "test");
652 // Prepare the receivers log
653 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
654 log.append(newReplicatedLogEntry(1, 0, "zero"));
655 log.append(newReplicatedLogEntry(1, 1, "one"));
656 log.append(newReplicatedLogEntry(1, 2, "two"));
658 context.setReplicatedLog(log);
660 // Prepare the entries to be sent with AppendEntries
661 List<ReplicatedLogEntry> entries = new ArrayList<>();
662 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
663 entries.add(newReplicatedLogEntry(2, 3, "three"));
665 // Send appendEntries with the same term as was set on the receiver
666 // before the new behavior was created (1 in this case)
667 // This will not work for a Candidate because as soon as a Candidate
668 // is created it increments the term
669 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
671 context.setRaftPolicy(createRaftPolicy(false, true));
672 follower = createBehavior(context);
674 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
676 Assert.assertSame(follower, newBehavior);
678 expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
682 public void testHandleAppendEntriesPreviousLogEntryMissing() {
683 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
685 final MockRaftActorContext context = createActorContext();
687 // Prepare the receivers log
688 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
689 log.append(newReplicatedLogEntry(1, 0, "zero"));
690 log.append(newReplicatedLogEntry(1, 1, "one"));
691 log.append(newReplicatedLogEntry(1, 2, "two"));
693 context.setReplicatedLog(log);
695 // Prepare the entries to be sent with AppendEntries
696 List<ReplicatedLogEntry> entries = new ArrayList<>();
697 entries.add(newReplicatedLogEntry(1, 4, "four"));
699 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
701 follower = createBehavior(context);
703 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
705 Assert.assertSame(follower, newBehavior);
707 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
711 public void testHandleAppendEntriesWithExistingLogEntry() {
712 logStart("testHandleAppendEntriesWithExistingLogEntry");
714 MockRaftActorContext context = createActorContext();
716 context.getTermInformation().update(1, "test");
718 // Prepare the receivers log
719 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
720 log.append(newReplicatedLogEntry(1, 0, "zero"));
721 log.append(newReplicatedLogEntry(1, 1, "one"));
723 context.setReplicatedLog(log);
725 // Send the last entry again.
726 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
728 follower = createBehavior(context);
730 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
732 assertEquals("Next index", 2, log.last().getIndex() + 1);
733 assertEquals("Entry 1", entries.get(0), log.get(1));
735 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
737 // Send the last entry again and also a new one.
739 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
741 leaderActor.underlyingActor().clear();
742 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
744 assertEquals("Next index", 3, log.last().getIndex() + 1);
745 assertEquals("Entry 1", entries.get(0), log.get(1));
746 assertEquals("Entry 2", entries.get(1), log.get(2));
748 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
752 public void testHandleAppendEntriesAfterInstallingSnapshot() {
753 logStart("testHandleAppendAfterInstallingSnapshot");
755 MockRaftActorContext context = createActorContext();
757 // Prepare the receivers log
758 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
760 // Set up a log as if it has been snapshotted
761 log.setSnapshotIndex(3);
762 log.setSnapshotTerm(1);
764 context.setReplicatedLog(log);
766 // Prepare the entries to be sent with AppendEntries
767 List<ReplicatedLogEntry> entries = new ArrayList<>();
768 entries.add(newReplicatedLogEntry(1, 4, "four"));
770 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
772 follower = createBehavior(context);
774 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
776 Assert.assertSame(follower, newBehavior);
778 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
783 * This test verifies that when InstallSnapshot is received by
784 * the follower its applied correctly.
787 public void testHandleInstallSnapshot() throws Exception {
788 logStart("testHandleInstallSnapshot");
790 MockRaftActorContext context = createActorContext();
791 context.getTermInformation().update(1, "leader");
793 follower = createBehavior(context);
795 ByteString bsSnapshot = createSnapshot();
797 int snapshotLength = bsSnapshot.size();
799 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
800 int lastIncludedIndex = 1;
802 InstallSnapshot lastInstallSnapshot = null;
804 for (int i = 0; i < totalChunks; i++) {
805 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
806 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
807 chunkData, chunkIndex, totalChunks);
808 follower.handleMessage(leaderActor, lastInstallSnapshot);
809 offset = offset + 50;
814 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
815 ApplySnapshot.class);
816 Snapshot snapshot = applySnapshot.getSnapshot();
817 assertNotNull(lastInstallSnapshot);
818 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
819 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
820 snapshot.getLastAppliedTerm());
821 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
822 snapshot.getLastAppliedIndex());
823 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
824 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
825 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
826 assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
827 applySnapshot.getCallback().onSuccess();
829 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
830 leaderActor, InstallSnapshotReply.class);
831 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
834 for (InstallSnapshotReply reply: replies) {
835 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
836 assertEquals("getTerm", 1, reply.getTerm());
837 assertEquals("isSuccess", true, reply.isSuccess());
838 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
841 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
846 * Verify that when an AppendEntries is sent to a follower during a snapshot install
847 * the Follower short-circuits the processing of the AppendEntries message.
850 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
851 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
853 MockRaftActorContext context = createActorContext();
855 follower = createBehavior(context);
857 ByteString bsSnapshot = createSnapshot();
858 int snapshotLength = bsSnapshot.size();
860 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
861 int lastIncludedIndex = 1;
863 // Check that snapshot installation is not in progress
864 assertNull(follower.getSnapshotTracker());
866 // Make sure that we have more than 1 chunk to send
867 assertTrue(totalChunks > 1);
869 // Send an install snapshot with the first chunk to start the process of installing a snapshot
870 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
871 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
872 chunkData, 1, totalChunks));
874 // Check if snapshot installation is in progress now
875 assertNotNull(follower.getSnapshotTracker());
877 // Send an append entry
878 AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
879 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
881 follower.handleMessage(leaderActor, appendEntries);
883 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
884 assertEquals("isSuccess", true, reply.isSuccess());
885 assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
886 assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
887 assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
889 assertNotNull(follower.getSnapshotTracker());
893 public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
894 logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
896 MockRaftActorContext context = createActorContext();
898 follower = createBehavior(context);
900 ByteString bsSnapshot = createSnapshot();
901 int snapshotLength = bsSnapshot.size();
903 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
904 int lastIncludedIndex = 1;
906 // Check that snapshot installation is not in progress
907 assertNull(follower.getSnapshotTracker());
909 // Make sure that we have more than 1 chunk to send
910 assertTrue(totalChunks > 1);
912 // Send an install snapshot with the first chunk to start the process of installing a snapshot
913 byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
914 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
915 chunkData, 1, totalChunks));
917 // Check if snapshot installation is in progress now
918 assertNotNull(follower.getSnapshotTracker());
920 // Send appendEntries with a new term and leader.
921 AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
922 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
924 follower.handleMessage(leaderActor, appendEntries);
926 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
927 assertEquals("isSuccess", true, reply.isSuccess());
928 assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
929 assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
930 assertEquals("getTerm", 2, reply.getTerm());
932 assertNull(follower.getSnapshotTracker());
936 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
937 logStart("testInitialSyncUpWithHandleInstallSnapshot");
939 MockRaftActorContext context = createActorContext();
940 context.setCommitIndex(-1);
942 follower = createBehavior(context);
944 ByteString bsSnapshot = createSnapshot();
946 int snapshotLength = bsSnapshot.size();
948 int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
949 int lastIncludedIndex = 1;
951 InstallSnapshot lastInstallSnapshot = null;
953 for (int i = 0; i < totalChunks; i++) {
954 byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
955 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
956 chunkData, chunkIndex, totalChunks);
957 follower.handleMessage(leaderActor, lastInstallSnapshot);
958 offset = offset + 50;
963 FollowerInitialSyncUpStatus syncStatus =
964 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
966 assertFalse(syncStatus.isInitialSyncDone());
968 // Clear all the messages
969 followerActor.underlyingActor().clear();
971 context.setLastApplied(101);
972 context.setCommitIndex(101);
973 setLastLogEntry(context, 1, 101,
974 new MockRaftActorContext.MockPayload(""));
976 List<ReplicatedLogEntry> entries = Arrays.asList(
977 newReplicatedLogEntry(2, 101, "foo"));
979 // The new commitIndex is 101
980 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
981 follower.handleMessage(leaderActor, appendEntries);
983 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
985 assertTrue(syncStatus.isInitialSyncDone());
989 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
990 logStart("testHandleOutOfSequenceInstallSnapshot");
992 MockRaftActorContext context = createActorContext();
994 follower = createBehavior(context);
996 ByteString bsSnapshot = createSnapshot();
998 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
999 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1000 follower.handleMessage(leaderActor, installSnapshot);
1002 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1003 InstallSnapshotReply.class);
1005 assertEquals("isSuccess", false, reply.isSuccess());
1006 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1007 assertEquals("getTerm", 1, reply.getTerm());
1008 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1010 assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1014 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1015 MockRaftActorContext context = createActorContext();
1017 Stopwatch stopwatch = Stopwatch.createStarted();
1019 follower = createBehavior(context);
1021 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1023 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1025 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1027 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1028 assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1032 public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1033 MockRaftActorContext context = createActorContext();
1034 context.setConfigParams(new DefaultConfigParamsImpl() {
1036 public FiniteDuration getElectionTimeOutInterval() {
1037 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1041 context.setRaftPolicy(createRaftPolicy(false, false));
1043 follower = createBehavior(context);
1045 TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1046 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1047 assertSame("handleMessage result", follower, newBehavior);
1051 public void testFollowerSchedulesElectionIfNonVoting() {
1052 MockRaftActorContext context = createActorContext();
1053 context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1054 ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1055 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1056 ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1058 follower = new Follower(context, "leader", (short)1);
1060 ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1061 ElectionTimeout.class);
1062 RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1063 assertSame("handleMessage result", follower, newBehavior);
1064 assertNull("Expected null leaderId", follower.getLeaderId());
1068 public void testElectionScheduledWhenAnyRaftRPCReceived() {
1069 MockRaftActorContext context = createActorContext();
1070 follower = createBehavior(context);
1071 follower.handleMessage(leaderActor, new RaftRPC() {
1072 private static final long serialVersionUID = 1L;
1075 public long getTerm() {
1079 verify(follower).scheduleElection(any(FiniteDuration.class));
1083 public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1084 MockRaftActorContext context = createActorContext();
1085 follower = createBehavior(context);
1086 follower.handleMessage(leaderActor, "non-raft-rpc");
1087 verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1090 public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
1091 int snapshotLength = bs.size();
1093 int size = chunkSize;
1094 if (chunkSize > snapshotLength) {
1095 size = snapshotLength;
1097 if (start + chunkSize > snapshotLength) {
1098 size = snapshotLength - start;
1102 byte[] nextChunk = new byte[size];
1103 bs.copyTo(nextChunk, start, 0, size);
1107 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1108 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1109 expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1112 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1113 String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1114 boolean expForceInstallSnapshot) {
1116 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1117 AppendEntriesReply.class);
1119 assertEquals("isSuccess", expSuccess, reply.isSuccess());
1120 assertEquals("getTerm", expTerm, reply.getTerm());
1121 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1122 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1123 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1124 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1125 assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1129 private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1130 return new SimpleReplicatedLogEntry(index, term,
1131 new MockRaftActorContext.MockPayload(data));
1134 private ByteString createSnapshot() {
1135 HashMap<String, String> followerSnapshot = new HashMap<>();
1136 followerSnapshot.put("1", "A");
1137 followerSnapshot.put("2", "B");
1138 followerSnapshot.put("3", "C");
1140 return toByteString(followerSnapshot);
1144 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1145 ActorRef actorRef, RaftRPC rpc) throws Exception {
1146 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1148 String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1149 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1153 protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1155 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1156 assertEquals("isSuccess", true, reply.isSuccess());