1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.Snapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
32 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
37 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
39 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import scala.concurrent.duration.FiniteDuration;
43 public class FollowerTest extends AbstractRaftActorBehaviorTest {
45 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
46 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
48 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
49 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
51 private RaftActorBehavior follower;
53 private final short payloadVersion = 5;
57 public void tearDown() throws Exception {
58 if(follower != null) {
66 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
67 return new Follower(actorContext);
71 protected MockRaftActorContext createActorContext() {
72 return createActorContext(followerActor);
76 protected MockRaftActorContext createActorContext(ActorRef actorRef){
77 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
78 context.setPayloadVersion(payloadVersion );
83 public void testThatAnElectionTimeoutIsTriggered(){
84 MockRaftActorContext actorContext = createActorContext();
85 follower = new Follower(actorContext);
87 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
88 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
92 public void testHandleElectionTimeout(){
93 logStart("testHandleElectionTimeout");
95 follower = new Follower(createActorContext());
97 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
99 assertTrue(raftBehavior instanceof Candidate);
103 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
104 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
106 RaftActorContext context = createActorContext();
108 context.getTermInformation().update(term, null);
110 follower = createBehavior(context);
112 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
114 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
116 assertEquals("isVoteGranted", true, reply.isVoteGranted());
117 assertEquals("getTerm", term, reply.getTerm());
121 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
122 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
124 RaftActorContext context = createActorContext();
126 context.getTermInformation().update(term, "test");
128 follower = createBehavior(context);
130 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
132 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
134 assertEquals("isVoteGranted", false, reply.isVoteGranted());
139 public void testHandleFirstAppendEntries() throws Exception {
140 logStart("testHandleFirstAppendEntries");
142 MockRaftActorContext context = createActorContext();
143 context.getReplicatedLog().clear(0,2);
144 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
145 context.getReplicatedLog().setSnapshotIndex(99);
147 List<ReplicatedLogEntry> entries = Arrays.asList(
148 newReplicatedLogEntry(2, 101, "foo"));
150 Assert.assertEquals(1, context.getReplicatedLog().size());
152 // The new commitIndex is 101
153 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
155 follower = createBehavior(context);
156 follower.handleMessage(leaderActor, appendEntries);
158 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
159 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
161 assertFalse(syncStatus.isInitialSyncDone());
162 assertTrue("append entries reply should be true", reply.isSuccess());
166 public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
167 logStart("testHandleFirstAppendEntries");
169 MockRaftActorContext context = createActorContext();
171 List<ReplicatedLogEntry> entries = Arrays.asList(
172 newReplicatedLogEntry(2, 101, "foo"));
174 // The new commitIndex is 101
175 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
177 follower = createBehavior(context);
178 follower.handleMessage(leaderActor, appendEntries);
180 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
181 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
183 assertFalse(syncStatus.isInitialSyncDone());
184 assertFalse("append entries reply should be false", reply.isSuccess());
188 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
189 logStart("testHandleFirstAppendEntries");
191 MockRaftActorContext context = createActorContext();
192 context.getReplicatedLog().clear(0,2);
193 context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
194 context.getReplicatedLog().setSnapshotIndex(99);
196 List<ReplicatedLogEntry> entries = Arrays.asList(
197 newReplicatedLogEntry(2, 101, "foo"));
199 // The new commitIndex is 101
200 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
202 follower = createBehavior(context);
203 follower.handleMessage(leaderActor, appendEntries);
205 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
206 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
208 assertFalse(syncStatus.isInitialSyncDone());
209 assertTrue("append entries reply should be true", reply.isSuccess());
213 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
214 logStart("testHandleFirstAppendEntries");
216 MockRaftActorContext context = createActorContext();
217 context.getReplicatedLog().clear(0,2);
218 context.getReplicatedLog().setSnapshotIndex(100);
220 List<ReplicatedLogEntry> entries = Arrays.asList(
221 newReplicatedLogEntry(2, 101, "foo"));
223 // The new commitIndex is 101
224 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
226 follower = createBehavior(context);
227 follower.handleMessage(leaderActor, appendEntries);
229 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
230 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
232 assertFalse(syncStatus.isInitialSyncDone());
233 assertTrue("append entries reply should be true", reply.isSuccess());
237 public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
238 logStart("testHandleFirstAppendEntries");
240 MockRaftActorContext context = createActorContext();
241 context.getReplicatedLog().clear(0,2);
242 context.getReplicatedLog().setSnapshotIndex(100);
244 List<ReplicatedLogEntry> entries = Arrays.asList(
245 newReplicatedLogEntry(2, 105, "foo"));
247 // The new commitIndex is 101
248 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
250 follower = createBehavior(context);
251 follower.handleMessage(leaderActor, appendEntries);
253 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
254 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
256 assertFalse(syncStatus.isInitialSyncDone());
257 assertFalse("append entries reply should be false", reply.isSuccess());
261 public void testHandleSyncUpAppendEntries() throws Exception {
262 logStart("testHandleSyncUpAppendEntries");
264 MockRaftActorContext context = createActorContext();
266 List<ReplicatedLogEntry> entries = Arrays.asList(
267 newReplicatedLogEntry(2, 101, "foo"));
269 // The new commitIndex is 101
270 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
272 follower = createBehavior(context);
273 follower.handleMessage(leaderActor, appendEntries);
275 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
277 assertFalse(syncStatus.isInitialSyncDone());
279 // Clear all the messages
280 followerActor.underlyingActor().clear();
282 context.setLastApplied(101);
283 context.setCommitIndex(101);
284 setLastLogEntry(context, 1, 101,
285 new MockRaftActorContext.MockPayload(""));
287 entries = Arrays.asList(
288 newReplicatedLogEntry(2, 101, "foo"));
290 // The new commitIndex is 101
291 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
292 follower.handleMessage(leaderActor, appendEntries);
294 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
296 assertTrue(syncStatus.isInitialSyncDone());
298 followerActor.underlyingActor().clear();
300 // Sending the same message again should not generate another message
302 follower.handleMessage(leaderActor, appendEntries);
304 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
306 assertNull(syncStatus);
311 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
312 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
314 MockRaftActorContext context = createActorContext();
316 List<ReplicatedLogEntry> entries = Arrays.asList(
317 newReplicatedLogEntry(2, 101, "foo"));
319 // The new commitIndex is 101
320 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
322 follower = createBehavior(context);
323 follower.handleMessage(leaderActor, appendEntries);
325 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
327 assertFalse(syncStatus.isInitialSyncDone());
329 // Clear all the messages
330 followerActor.underlyingActor().clear();
332 context.setLastApplied(100);
333 setLastLogEntry(context, 1, 100,
334 new MockRaftActorContext.MockPayload(""));
336 entries = Arrays.asList(
337 newReplicatedLogEntry(2, 101, "foo"));
339 // leader-2 is becoming the leader now and it says the commitIndex is 45
340 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
341 follower.handleMessage(leaderActor, appendEntries);
343 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
345 // We get a new message saying initial status is not done
346 assertFalse(syncStatus.isInitialSyncDone());
352 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
353 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
355 MockRaftActorContext context = createActorContext();
357 List<ReplicatedLogEntry> entries = Arrays.asList(
358 newReplicatedLogEntry(2, 101, "foo"));
360 // The new commitIndex is 101
361 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
363 follower = createBehavior(context);
364 follower.handleMessage(leaderActor, appendEntries);
366 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
368 assertFalse(syncStatus.isInitialSyncDone());
370 // Clear all the messages
371 followerActor.underlyingActor().clear();
373 context.setLastApplied(101);
374 context.setCommitIndex(101);
375 setLastLogEntry(context, 1, 101,
376 new MockRaftActorContext.MockPayload(""));
378 entries = Arrays.asList(
379 newReplicatedLogEntry(2, 101, "foo"));
381 // The new commitIndex is 101
382 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
383 follower.handleMessage(leaderActor, appendEntries);
385 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
387 assertTrue(syncStatus.isInitialSyncDone());
389 // Clear all the messages
390 followerActor.underlyingActor().clear();
392 context.setLastApplied(100);
393 setLastLogEntry(context, 1, 100,
394 new MockRaftActorContext.MockPayload(""));
396 entries = Arrays.asList(
397 newReplicatedLogEntry(2, 101, "foo"));
399 // leader-2 is becoming the leader now and it says the commitIndex is 45
400 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
401 follower.handleMessage(leaderActor, appendEntries);
403 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
405 // We get a new message saying initial status is not done
406 assertFalse(syncStatus.isInitialSyncDone());
412 * This test verifies that when an AppendEntries RPC is received by a RaftActor
413 * with a commitIndex that is greater than what has been applied to the
414 * state machine of the RaftActor, the RaftActor applies the state and
415 * sets it current applied state to the commitIndex of the sender.
420 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
421 logStart("testHandleAppendEntriesWithNewerCommitIndex");
423 MockRaftActorContext context = createActorContext();
425 context.setLastApplied(100);
426 setLastLogEntry(context, 1, 100,
427 new MockRaftActorContext.MockPayload(""));
428 context.getReplicatedLog().setSnapshotIndex(99);
430 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
431 newReplicatedLogEntry(2, 101, "foo"));
433 // The new commitIndex is 101
434 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
436 follower = createBehavior(context);
437 follower.handleMessage(leaderActor, appendEntries);
439 assertEquals("getLastApplied", 101L, context.getLastApplied());
443 * This test verifies that when an AppendEntries is received a specific prevLogTerm
444 * which does not match the term that is in RaftActors log entry at prevLogIndex
445 * then the RaftActor does not change it's state and it returns a failure.
450 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
451 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
453 MockRaftActorContext context = createActorContext();
455 // First set the receivers term to lower number
456 context.getTermInformation().update(95, "test");
458 // AppendEntries is now sent with a bigger term
459 // this will set the receivers term to be the same as the sender's term
460 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
462 follower = createBehavior(context);
464 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
466 Assert.assertSame(follower, newBehavior);
468 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
469 AppendEntriesReply.class);
471 assertEquals("isSuccess", false, reply.isSuccess());
475 * This test verifies that when a new AppendEntries message is received with
476 * new entries and the logs of the sender and receiver match that the new
477 * entries get added to the log and the log is incremented by the number of
478 * entries received in appendEntries
483 public void testHandleAppendEntriesAddNewEntries() {
484 logStart("testHandleAppendEntriesAddNewEntries");
486 MockRaftActorContext context = createActorContext();
488 // First set the receivers term to lower number
489 context.getTermInformation().update(1, "test");
491 // Prepare the receivers log
492 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
493 log.append(newReplicatedLogEntry(1, 0, "zero"));
494 log.append(newReplicatedLogEntry(1, 1, "one"));
495 log.append(newReplicatedLogEntry(1, 2, "two"));
497 context.setReplicatedLog(log);
499 // Prepare the entries to be sent with AppendEntries
500 List<ReplicatedLogEntry> entries = new ArrayList<>();
501 entries.add(newReplicatedLogEntry(1, 3, "three"));
502 entries.add(newReplicatedLogEntry(1, 4, "four"));
504 // Send appendEntries with the same term as was set on the receiver
505 // before the new behavior was created (1 in this case)
506 // This will not work for a Candidate because as soon as a Candidate
507 // is created it increments the term
508 short leaderPayloadVersion = 10;
509 String leaderId = "leader-1";
510 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
512 follower = createBehavior(context);
514 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
516 Assert.assertSame(follower, newBehavior);
518 assertEquals("Next index", 5, log.last().getIndex() + 1);
519 assertEquals("Entry 3", entries.get(0), log.get(3));
520 assertEquals("Entry 4", entries.get(1), log.get(4));
522 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
523 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
525 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
529 * This test verifies that when a new AppendEntries message is received with
530 * new entries and the logs of the sender and receiver are out-of-sync that
531 * the log is first corrected by removing the out of sync entries from the
532 * log and then adding in the new entries sent with the AppendEntries message
535 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
536 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
538 MockRaftActorContext context = createActorContext();
540 // First set the receivers term to lower number
541 context.getTermInformation().update(1, "test");
543 // Prepare the receivers log
544 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
545 log.append(newReplicatedLogEntry(1, 0, "zero"));
546 log.append(newReplicatedLogEntry(1, 1, "one"));
547 log.append(newReplicatedLogEntry(1, 2, "two"));
549 context.setReplicatedLog(log);
551 // Prepare the entries to be sent with AppendEntries
552 List<ReplicatedLogEntry> entries = new ArrayList<>();
553 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
554 entries.add(newReplicatedLogEntry(2, 3, "three"));
556 // Send appendEntries with the same term as was set on the receiver
557 // before the new behavior was created (1 in this case)
558 // This will not work for a Candidate because as soon as a Candidate
559 // is created it increments the term
560 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
562 follower = createBehavior(context);
564 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
566 Assert.assertSame(follower, newBehavior);
568 // The entry at index 2 will be found out-of-sync with the leader
569 // and will be removed
570 // Then the two new entries will be added to the log
571 // Thus making the log to have 4 entries
572 assertEquals("Next index", 4, log.last().getIndex() + 1);
573 //assertEquals("Entry 2", entries.get(0), log.get(2));
575 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
577 // Check that the entry at index 2 has the new data
578 assertEquals("Entry 2", entries.get(0), log.get(2));
580 assertEquals("Entry 3", entries.get(1), log.get(3));
582 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
586 public void testHandleAppendEntriesPreviousLogEntryMissing(){
587 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
589 MockRaftActorContext context = createActorContext();
591 // Prepare the receivers log
592 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
593 log.append(newReplicatedLogEntry(1, 0, "zero"));
594 log.append(newReplicatedLogEntry(1, 1, "one"));
595 log.append(newReplicatedLogEntry(1, 2, "two"));
597 context.setReplicatedLog(log);
599 // Prepare the entries to be sent with AppendEntries
600 List<ReplicatedLogEntry> entries = new ArrayList<>();
601 entries.add(newReplicatedLogEntry(1, 4, "four"));
603 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
605 follower = createBehavior(context);
607 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
609 Assert.assertSame(follower, newBehavior);
611 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
615 public void testHandleAppendEntriesWithExistingLogEntry() {
616 logStart("testHandleAppendEntriesWithExistingLogEntry");
618 MockRaftActorContext context = createActorContext();
620 context.getTermInformation().update(1, "test");
622 // Prepare the receivers log
623 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
624 log.append(newReplicatedLogEntry(1, 0, "zero"));
625 log.append(newReplicatedLogEntry(1, 1, "one"));
627 context.setReplicatedLog(log);
629 // Send the last entry again.
630 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
632 follower = createBehavior(context);
634 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
636 assertEquals("Next index", 2, log.last().getIndex() + 1);
637 assertEquals("Entry 1", entries.get(0), log.get(1));
639 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
641 // Send the last entry again and also a new one.
643 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
645 leaderActor.underlyingActor().clear();
646 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
648 assertEquals("Next index", 3, log.last().getIndex() + 1);
649 assertEquals("Entry 1", entries.get(0), log.get(1));
650 assertEquals("Entry 2", entries.get(1), log.get(2));
652 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
656 public void testHandleAppendEntriesAfterInstallingSnapshot(){
657 logStart("testHandleAppendAfterInstallingSnapshot");
659 MockRaftActorContext context = createActorContext();
661 // Prepare the receivers log
662 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
664 // Set up a log as if it has been snapshotted
665 log.setSnapshotIndex(3);
666 log.setSnapshotTerm(1);
668 context.setReplicatedLog(log);
670 // Prepare the entries to be sent with AppendEntries
671 List<ReplicatedLogEntry> entries = new ArrayList<>();
672 entries.add(newReplicatedLogEntry(1, 4, "four"));
674 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
676 follower = createBehavior(context);
678 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
680 Assert.assertSame(follower, newBehavior);
682 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
687 * This test verifies that when InstallSnapshot is received by
688 * the follower its applied correctly.
693 public void testHandleInstallSnapshot() throws Exception {
694 logStart("testHandleInstallSnapshot");
696 MockRaftActorContext context = createActorContext();
698 follower = createBehavior(context);
700 ByteString bsSnapshot = createSnapshot();
702 int snapshotLength = bsSnapshot.size();
704 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
705 int lastIncludedIndex = 1;
707 InstallSnapshot lastInstallSnapshot = null;
709 for(int i = 0; i < totalChunks; i++) {
710 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
711 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
712 chunkData, chunkIndex, totalChunks);
713 follower.handleMessage(leaderActor, lastInstallSnapshot);
714 offset = offset + 50;
719 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
720 ApplySnapshot.class);
721 Snapshot snapshot = applySnapshot.getSnapshot();
722 assertNotNull(lastInstallSnapshot);
723 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
724 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
725 snapshot.getLastAppliedTerm());
726 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
727 snapshot.getLastAppliedIndex());
728 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
729 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
731 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
732 leaderActor, InstallSnapshotReply.class);
733 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
736 for(InstallSnapshotReply reply: replies) {
737 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
738 assertEquals("getTerm", 1, reply.getTerm());
739 assertEquals("isSuccess", true, reply.isSuccess());
740 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
743 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
748 * Verify that when an AppendEntries is sent to a follower during a snapshot install
749 * the Follower short-circuits the processing of the AppendEntries message.
754 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
755 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
757 MockRaftActorContext context = createActorContext();
759 follower = createBehavior(context);
761 ByteString bsSnapshot = createSnapshot();
762 int snapshotLength = bsSnapshot.size();
764 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
765 int lastIncludedIndex = 1;
767 // Check that snapshot installation is not in progress
768 assertNull(((Follower) follower).getSnapshotTracker());
770 // Make sure that we have more than 1 chunk to send
771 assertTrue(totalChunks > 1);
773 // Send an install snapshot with the first chunk to start the process of installing a snapshot
774 ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
775 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
776 chunkData, 1, totalChunks));
778 // Check if snapshot installation is in progress now
779 assertNotNull(((Follower) follower).getSnapshotTracker());
781 // Send an append entry
782 AppendEntries appendEntries = mock(AppendEntries.class);
783 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
785 follower.handleMessage(leaderActor, appendEntries);
787 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
788 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
789 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
790 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
792 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
793 verify(appendEntries, never()).getPrevLogIndex();
798 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
799 logStart("testInitialSyncUpWithHandleInstallSnapshot");
801 MockRaftActorContext context = createActorContext();
803 follower = createBehavior(context);
805 ByteString bsSnapshot = createSnapshot();
807 int snapshotLength = bsSnapshot.size();
809 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
810 int lastIncludedIndex = 1;
812 InstallSnapshot lastInstallSnapshot = null;
814 for(int i = 0; i < totalChunks; i++) {
815 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
816 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
817 chunkData, chunkIndex, totalChunks);
818 follower.handleMessage(leaderActor, lastInstallSnapshot);
819 offset = offset + 50;
824 FollowerInitialSyncUpStatus syncStatus =
825 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
827 assertFalse(syncStatus.isInitialSyncDone());
829 // Clear all the messages
830 followerActor.underlyingActor().clear();
832 context.setLastApplied(101);
833 context.setCommitIndex(101);
834 setLastLogEntry(context, 1, 101,
835 new MockRaftActorContext.MockPayload(""));
837 List<ReplicatedLogEntry> entries = Arrays.asList(
838 newReplicatedLogEntry(2, 101, "foo"));
840 // The new commitIndex is 101
841 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
842 follower.handleMessage(leaderActor, appendEntries);
844 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
846 assertTrue(syncStatus.isInitialSyncDone());
850 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
851 logStart("testHandleOutOfSequenceInstallSnapshot");
853 MockRaftActorContext context = createActorContext();
855 follower = createBehavior(context);
857 ByteString bsSnapshot = createSnapshot();
859 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
860 getNextChunk(bsSnapshot, 10, 50), 3, 3);
861 follower.handleMessage(leaderActor, installSnapshot);
863 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
864 InstallSnapshotReply.class);
866 assertEquals("isSuccess", false, reply.isSuccess());
867 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
868 assertEquals("getTerm", 1, reply.getTerm());
869 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
871 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
875 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
876 MockRaftActorContext context = createActorContext();
878 Stopwatch stopwatch = Stopwatch.createStarted();
880 follower = createBehavior(context);
882 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
884 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
886 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
890 public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
891 MockRaftActorContext context = createActorContext();
892 context.setConfigParams(new DefaultConfigParamsImpl(){
894 public FiniteDuration getElectionTimeOutInterval() {
895 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
899 context.setRaftPolicy(createRaftPolicy(false, false));
901 follower = createBehavior(context);
903 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
907 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
908 int snapshotLength = bs.size();
910 int size = chunkSize;
911 if (chunkSize > snapshotLength) {
912 size = snapshotLength;
914 if ((start + chunkSize) > snapshotLength) {
915 size = snapshotLength - start;
918 return bs.substring(start, start + size);
921 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
922 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
924 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
925 AppendEntriesReply.class);
927 assertEquals("isSuccess", expSuccess, reply.isSuccess());
928 assertEquals("getTerm", expTerm, reply.getTerm());
929 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
930 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
931 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
932 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
935 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
936 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
937 new MockRaftActorContext.MockPayload(data));
940 private ByteString createSnapshot(){
941 HashMap<String, String> followerSnapshot = new HashMap<>();
942 followerSnapshot.put("1", "A");
943 followerSnapshot.put("2", "B");
944 followerSnapshot.put("3", "C");
946 return toByteString(followerSnapshot);
950 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
951 ActorRef actorRef, RaftRPC rpc) throws Exception {
952 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
954 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
955 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
959 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
961 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
962 assertEquals("isSuccess", true, reply.isSuccess());