BUG 4212 : Follower should not reschedule election timeout in certain cases.
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.Props;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Stopwatch;
24 import com.google.protobuf.ByteString;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
34 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftActorContext;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.Snapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
40 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import scala.concurrent.duration.FiniteDuration;
50
51 public class FollowerTest extends AbstractRaftActorBehaviorTest {
52
53     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
54             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
55
56     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
57             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
58
59     private RaftActorBehavior follower;
60
61     private final short payloadVersion = 5;
62
63     @Override
64     @After
65     public void tearDown() throws Exception {
66         if(follower != null) {
67             follower.close();
68         }
69
70         super.tearDown();
71     }
72
73     @Override
74     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
75         return new TestFollower(actorContext);
76     }
77
78     @Override
79     protected  MockRaftActorContext createActorContext() {
80         return createActorContext(followerActor);
81     }
82
83     @Override
84     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
85         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
86         context.setPayloadVersion(payloadVersion );
87         return context;
88     }
89
90     private int getElectionTimeoutCount(RaftActorBehavior follower){
91         if(follower instanceof TestFollower){
92             return ((TestFollower) follower).getElectionTimeoutCount();
93         }
94         return -1;
95     }
96
97     @Test
98     public void testThatAnElectionTimeoutIsTriggered(){
99         MockRaftActorContext actorContext = createActorContext();
100         follower = new Follower(actorContext);
101
102         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
103                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
104     }
105
106     @Test
107     public void testHandleElectionTimeout(){
108         logStart("testHandleElectionTimeout");
109
110         follower = new Follower(createActorContext());
111
112         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
113
114         assertTrue(raftBehavior instanceof Candidate);
115     }
116
117     @Test
118     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
119         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
120
121         RaftActorContext context = createActorContext();
122         long term = 1000;
123         context.getTermInformation().update(term, null);
124
125         follower = createBehavior(context);
126
127         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
128
129         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
130
131         assertEquals("isVoteGranted", true, reply.isVoteGranted());
132         assertEquals("getTerm", term, reply.getTerm());
133         assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
134     }
135
136     @Test
137     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
138         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
139
140         RaftActorContext context = createActorContext();
141         long term = 1000;
142         context.getTermInformation().update(term, "test");
143
144         follower = createBehavior(context);
145
146         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
147
148         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
149
150         assertEquals("isVoteGranted", false, reply.isVoteGranted());
151         assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
152     }
153
154
155     @Test
156     public void testHandleFirstAppendEntries() throws Exception {
157         logStart("testHandleFirstAppendEntries");
158
159         MockRaftActorContext context = createActorContext();
160         context.getReplicatedLog().clear(0,2);
161         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
162         context.getReplicatedLog().setSnapshotIndex(99);
163
164         List<ReplicatedLogEntry> entries = Arrays.asList(
165                 newReplicatedLogEntry(2, 101, "foo"));
166
167         Assert.assertEquals(1, context.getReplicatedLog().size());
168
169         // The new commitIndex is 101
170         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
171
172         follower = createBehavior(context);
173         follower.handleMessage(leaderActor, appendEntries);
174
175         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
176         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
177
178         assertFalse(syncStatus.isInitialSyncDone());
179         assertTrue("append entries reply should be true", reply.isSuccess());
180     }
181
182     @Test
183     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
184         logStart("testHandleFirstAppendEntries");
185
186         MockRaftActorContext context = createActorContext();
187
188         List<ReplicatedLogEntry> entries = Arrays.asList(
189                 newReplicatedLogEntry(2, 101, "foo"));
190
191         // The new commitIndex is 101
192         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
193
194         follower = createBehavior(context);
195         follower.handleMessage(leaderActor, appendEntries);
196
197         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
198         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
199
200         assertFalse(syncStatus.isInitialSyncDone());
201         assertFalse("append entries reply should be false", reply.isSuccess());
202     }
203
204     @Test
205     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
206         logStart("testHandleFirstAppendEntries");
207
208         MockRaftActorContext context = createActorContext();
209         context.getReplicatedLog().clear(0,2);
210         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
211         context.getReplicatedLog().setSnapshotIndex(99);
212
213         List<ReplicatedLogEntry> entries = Arrays.asList(
214                 newReplicatedLogEntry(2, 101, "foo"));
215
216         // The new commitIndex is 101
217         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
218
219         follower = createBehavior(context);
220         follower.handleMessage(leaderActor, appendEntries);
221
222         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
223         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
224
225         assertFalse(syncStatus.isInitialSyncDone());
226         assertTrue("append entries reply should be true", reply.isSuccess());
227     }
228
229     @Test
230     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
231         logStart("testHandleFirstAppendEntries");
232
233         MockRaftActorContext context = createActorContext();
234         context.getReplicatedLog().clear(0,2);
235         context.getReplicatedLog().setSnapshotIndex(100);
236
237         List<ReplicatedLogEntry> entries = Arrays.asList(
238                 newReplicatedLogEntry(2, 101, "foo"));
239
240         // The new commitIndex is 101
241         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
242
243         follower = createBehavior(context);
244         follower.handleMessage(leaderActor, appendEntries);
245
246         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
247         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
248
249         assertFalse(syncStatus.isInitialSyncDone());
250         assertTrue("append entries reply should be true", reply.isSuccess());
251     }
252
253     @Test
254     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
255         logStart("testHandleFirstAppendEntries");
256
257         MockRaftActorContext context = createActorContext();
258         context.getReplicatedLog().clear(0,2);
259         context.getReplicatedLog().setSnapshotIndex(100);
260
261         List<ReplicatedLogEntry> entries = Arrays.asList(
262                 newReplicatedLogEntry(2, 105, "foo"));
263
264         // The new commitIndex is 101
265         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
266
267         follower = createBehavior(context);
268         follower.handleMessage(leaderActor, appendEntries);
269
270         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
271         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
272
273         assertFalse(syncStatus.isInitialSyncDone());
274         assertFalse("append entries reply should be false", reply.isSuccess());
275     }
276
277     @Test
278     public void testHandleSyncUpAppendEntries() throws Exception {
279         logStart("testHandleSyncUpAppendEntries");
280
281         MockRaftActorContext context = createActorContext();
282
283         List<ReplicatedLogEntry> entries = Arrays.asList(
284                 newReplicatedLogEntry(2, 101, "foo"));
285
286         // The new commitIndex is 101
287         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
288
289         follower = createBehavior(context);
290         follower.handleMessage(leaderActor, appendEntries);
291
292         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
293
294         assertFalse(syncStatus.isInitialSyncDone());
295
296         // Clear all the messages
297         followerActor.underlyingActor().clear();
298
299         context.setLastApplied(101);
300         context.setCommitIndex(101);
301         setLastLogEntry(context, 1, 101,
302                 new MockRaftActorContext.MockPayload(""));
303
304         entries = Arrays.asList(
305                 newReplicatedLogEntry(2, 101, "foo"));
306
307         // The new commitIndex is 101
308         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
309         follower.handleMessage(leaderActor, appendEntries);
310
311         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
312
313         assertTrue(syncStatus.isInitialSyncDone());
314
315         followerActor.underlyingActor().clear();
316
317         // Sending the same message again should not generate another message
318
319         follower.handleMessage(leaderActor, appendEntries);
320
321         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
322
323         assertNull(syncStatus);
324
325     }
326
327     @Test
328     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
329         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
330
331         MockRaftActorContext context = createActorContext();
332
333         List<ReplicatedLogEntry> entries = Arrays.asList(
334                 newReplicatedLogEntry(2, 101, "foo"));
335
336         // The new commitIndex is 101
337         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
338
339         follower = createBehavior(context);
340         follower.handleMessage(leaderActor, appendEntries);
341
342         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
343
344         assertFalse(syncStatus.isInitialSyncDone());
345
346         // Clear all the messages
347         followerActor.underlyingActor().clear();
348
349         context.setLastApplied(100);
350         setLastLogEntry(context, 1, 100,
351                 new MockRaftActorContext.MockPayload(""));
352
353         entries = Arrays.asList(
354                 newReplicatedLogEntry(2, 101, "foo"));
355
356         // leader-2 is becoming the leader now and it says the commitIndex is 45
357         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
358         follower.handleMessage(leaderActor, appendEntries);
359
360         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
361
362         // We get a new message saying initial status is not done
363         assertFalse(syncStatus.isInitialSyncDone());
364
365     }
366
367
368     @Test
369     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
370         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
371
372         MockRaftActorContext context = createActorContext();
373
374         List<ReplicatedLogEntry> entries = Arrays.asList(
375                 newReplicatedLogEntry(2, 101, "foo"));
376
377         // The new commitIndex is 101
378         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
379
380         follower = createBehavior(context);
381         follower.handleMessage(leaderActor, appendEntries);
382
383         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
384
385         assertFalse(syncStatus.isInitialSyncDone());
386
387         // Clear all the messages
388         followerActor.underlyingActor().clear();
389
390         context.setLastApplied(101);
391         context.setCommitIndex(101);
392         setLastLogEntry(context, 1, 101,
393                 new MockRaftActorContext.MockPayload(""));
394
395         entries = Arrays.asList(
396                 newReplicatedLogEntry(2, 101, "foo"));
397
398         // The new commitIndex is 101
399         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
400         follower.handleMessage(leaderActor, appendEntries);
401
402         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
403
404         assertTrue(syncStatus.isInitialSyncDone());
405
406         // Clear all the messages
407         followerActor.underlyingActor().clear();
408
409         context.setLastApplied(100);
410         setLastLogEntry(context, 1, 100,
411                 new MockRaftActorContext.MockPayload(""));
412
413         entries = Arrays.asList(
414                 newReplicatedLogEntry(2, 101, "foo"));
415
416         // leader-2 is becoming the leader now and it says the commitIndex is 45
417         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
418         follower.handleMessage(leaderActor, appendEntries);
419
420         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
421
422         // We get a new message saying initial status is not done
423         assertFalse(syncStatus.isInitialSyncDone());
424
425     }
426
427
428     /**
429      * This test verifies that when an AppendEntries RPC is received by a RaftActor
430      * with a commitIndex that is greater than what has been applied to the
431      * state machine of the RaftActor, the RaftActor applies the state and
432      * sets it current applied state to the commitIndex of the sender.
433      *
434      * @throws Exception
435      */
436     @Test
437     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
438         logStart("testHandleAppendEntriesWithNewerCommitIndex");
439
440         MockRaftActorContext context = createActorContext();
441
442         context.setLastApplied(100);
443         setLastLogEntry(context, 1, 100,
444                 new MockRaftActorContext.MockPayload(""));
445         context.getReplicatedLog().setSnapshotIndex(99);
446
447         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
448                 newReplicatedLogEntry(2, 101, "foo"));
449
450         // The new commitIndex is 101
451         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
452
453         follower = createBehavior(context);
454         follower.handleMessage(leaderActor, appendEntries);
455
456         assertEquals("getLastApplied", 101L, context.getLastApplied());
457     }
458
459     /**
460      * This test verifies that when an AppendEntries is received a specific prevLogTerm
461      * which does not match the term that is in RaftActors log entry at prevLogIndex
462      * then the RaftActor does not change it's state and it returns a failure.
463      *
464      * @throws Exception
465      */
466     @Test
467     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
468         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
469
470         MockRaftActorContext context = createActorContext();
471
472         // First set the receivers term to lower number
473         context.getTermInformation().update(95, "test");
474
475         // AppendEntries is now sent with a bigger term
476         // this will set the receivers term to be the same as the sender's term
477         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
478
479         follower = createBehavior(context);
480
481         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
482
483         Assert.assertSame(follower, newBehavior);
484
485         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
486                 AppendEntriesReply.class);
487
488         assertEquals("isSuccess", false, reply.isSuccess());
489     }
490
491     /**
492      * This test verifies that when a new AppendEntries message is received with
493      * new entries and the logs of the sender and receiver match that the new
494      * entries get added to the log and the log is incremented by the number of
495      * entries received in appendEntries
496      *
497      * @throws Exception
498      */
499     @Test
500     public void testHandleAppendEntriesAddNewEntries() {
501         logStart("testHandleAppendEntriesAddNewEntries");
502
503         MockRaftActorContext context = createActorContext();
504
505         // First set the receivers term to lower number
506         context.getTermInformation().update(1, "test");
507
508         // Prepare the receivers log
509         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
510         log.append(newReplicatedLogEntry(1, 0, "zero"));
511         log.append(newReplicatedLogEntry(1, 1, "one"));
512         log.append(newReplicatedLogEntry(1, 2, "two"));
513
514         context.setReplicatedLog(log);
515
516         // Prepare the entries to be sent with AppendEntries
517         List<ReplicatedLogEntry> entries = new ArrayList<>();
518         entries.add(newReplicatedLogEntry(1, 3, "three"));
519         entries.add(newReplicatedLogEntry(1, 4, "four"));
520
521         // Send appendEntries with the same term as was set on the receiver
522         // before the new behavior was created (1 in this case)
523         // This will not work for a Candidate because as soon as a Candidate
524         // is created it increments the term
525         short leaderPayloadVersion = 10;
526         String leaderId = "leader-1";
527         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
528
529         follower = createBehavior(context);
530
531         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
532
533         Assert.assertSame(follower, newBehavior);
534
535         assertEquals("Next index", 5, log.last().getIndex() + 1);
536         assertEquals("Entry 3", entries.get(0), log.get(3));
537         assertEquals("Entry 4", entries.get(1), log.get(4));
538
539         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
540         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
541
542         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
543     }
544
545     /**
546      * This test verifies that when a new AppendEntries message is received with
547      * new entries and the logs of the sender and receiver are out-of-sync that
548      * the log is first corrected by removing the out of sync entries from the
549      * log and then adding in the new entries sent with the AppendEntries message
550      */
551     @Test
552     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
553         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
554
555         MockRaftActorContext context = createActorContext();
556
557         // First set the receivers term to lower number
558         context.getTermInformation().update(1, "test");
559
560         // Prepare the receivers log
561         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
562         log.append(newReplicatedLogEntry(1, 0, "zero"));
563         log.append(newReplicatedLogEntry(1, 1, "one"));
564         log.append(newReplicatedLogEntry(1, 2, "two"));
565
566         context.setReplicatedLog(log);
567
568         // Prepare the entries to be sent with AppendEntries
569         List<ReplicatedLogEntry> entries = new ArrayList<>();
570         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
571         entries.add(newReplicatedLogEntry(2, 3, "three"));
572
573         // Send appendEntries with the same term as was set on the receiver
574         // before the new behavior was created (1 in this case)
575         // This will not work for a Candidate because as soon as a Candidate
576         // is created it increments the term
577         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
578
579         follower = createBehavior(context);
580
581         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
582
583         Assert.assertSame(follower, newBehavior);
584
585         // The entry at index 2 will be found out-of-sync with the leader
586         // and will be removed
587         // Then the two new entries will be added to the log
588         // Thus making the log to have 4 entries
589         assertEquals("Next index", 4, log.last().getIndex() + 1);
590         //assertEquals("Entry 2", entries.get(0), log.get(2));
591
592         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
593
594         // Check that the entry at index 2 has the new data
595         assertEquals("Entry 2", entries.get(0), log.get(2));
596
597         assertEquals("Entry 3", entries.get(1), log.get(3));
598
599         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
600     }
601
602     @Test
603     public void testHandleAppendEntriesPreviousLogEntryMissing(){
604         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
605
606         MockRaftActorContext context = createActorContext();
607
608         // Prepare the receivers log
609         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
610         log.append(newReplicatedLogEntry(1, 0, "zero"));
611         log.append(newReplicatedLogEntry(1, 1, "one"));
612         log.append(newReplicatedLogEntry(1, 2, "two"));
613
614         context.setReplicatedLog(log);
615
616         // Prepare the entries to be sent with AppendEntries
617         List<ReplicatedLogEntry> entries = new ArrayList<>();
618         entries.add(newReplicatedLogEntry(1, 4, "four"));
619
620         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
621
622         follower = createBehavior(context);
623
624         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
625
626         Assert.assertSame(follower, newBehavior);
627
628         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
629     }
630
631     @Test
632     public void testHandleAppendEntriesWithExistingLogEntry() {
633         logStart("testHandleAppendEntriesWithExistingLogEntry");
634
635         MockRaftActorContext context = createActorContext();
636
637         context.getTermInformation().update(1, "test");
638
639         // Prepare the receivers log
640         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
641         log.append(newReplicatedLogEntry(1, 0, "zero"));
642         log.append(newReplicatedLogEntry(1, 1, "one"));
643
644         context.setReplicatedLog(log);
645
646         // Send the last entry again.
647         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
648
649         follower = createBehavior(context);
650
651         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
652
653         assertEquals("Next index", 2, log.last().getIndex() + 1);
654         assertEquals("Entry 1", entries.get(0), log.get(1));
655
656         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
657
658         // Send the last entry again and also a new one.
659
660         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
661
662         leaderActor.underlyingActor().clear();
663         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
664
665         assertEquals("Next index", 3, log.last().getIndex() + 1);
666         assertEquals("Entry 1", entries.get(0), log.get(1));
667         assertEquals("Entry 2", entries.get(1), log.get(2));
668
669         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
670     }
671
672     @Test
673     public void testHandleAppendEntriesAfterInstallingSnapshot(){
674         logStart("testHandleAppendAfterInstallingSnapshot");
675
676         MockRaftActorContext context = createActorContext();
677
678         // Prepare the receivers log
679         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
680
681         // Set up a log as if it has been snapshotted
682         log.setSnapshotIndex(3);
683         log.setSnapshotTerm(1);
684
685         context.setReplicatedLog(log);
686
687         // Prepare the entries to be sent with AppendEntries
688         List<ReplicatedLogEntry> entries = new ArrayList<>();
689         entries.add(newReplicatedLogEntry(1, 4, "four"));
690
691         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
692
693         follower = createBehavior(context);
694
695         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
696
697         Assert.assertSame(follower, newBehavior);
698
699         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
700     }
701
702
703     /**
704      * This test verifies that when InstallSnapshot is received by
705      * the follower its applied correctly.
706      *
707      * @throws Exception
708      */
709     @Test
710     public void testHandleInstallSnapshot() throws Exception {
711         logStart("testHandleInstallSnapshot");
712
713         MockRaftActorContext context = createActorContext();
714
715         follower = createBehavior(context);
716
717         ByteString bsSnapshot  = createSnapshot();
718         int offset = 0;
719         int snapshotLength = bsSnapshot.size();
720         int chunkSize = 50;
721         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
722         int lastIncludedIndex = 1;
723         int chunkIndex = 1;
724         InstallSnapshot lastInstallSnapshot = null;
725
726         for(int i = 0; i < totalChunks; i++) {
727             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
728             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
729                     chunkData, chunkIndex, totalChunks);
730             follower.handleMessage(leaderActor, lastInstallSnapshot);
731             offset = offset + 50;
732             lastIncludedIndex++;
733             chunkIndex++;
734         }
735
736         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
737                 ApplySnapshot.class);
738         Snapshot snapshot = applySnapshot.getSnapshot();
739         assertNotNull(lastInstallSnapshot);
740         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
741         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
742                 snapshot.getLastAppliedTerm());
743         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
744                 snapshot.getLastAppliedIndex());
745         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
746         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
747
748         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
749                 leaderActor, InstallSnapshotReply.class);
750         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
751
752         chunkIndex = 1;
753         for(InstallSnapshotReply reply: replies) {
754             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
755             assertEquals("getTerm", 1, reply.getTerm());
756             assertEquals("isSuccess", true, reply.isSuccess());
757             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
758         }
759
760         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
761     }
762
763
764     /**
765      * Verify that when an AppendEntries is sent to a follower during a snapshot install
766      * the Follower short-circuits the processing of the AppendEntries message.
767      *
768      * @throws Exception
769      */
770     @Test
771     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
772         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
773
774         MockRaftActorContext context = createActorContext();
775
776         follower = createBehavior(context);
777
778         ByteString bsSnapshot  = createSnapshot();
779         int snapshotLength = bsSnapshot.size();
780         int chunkSize = 50;
781         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
782         int lastIncludedIndex = 1;
783
784         // Check that snapshot installation is not in progress
785         assertNull(((Follower) follower).getSnapshotTracker());
786
787         // Make sure that we have more than 1 chunk to send
788         assertTrue(totalChunks > 1);
789
790         // Send an install snapshot with the first chunk to start the process of installing a snapshot
791         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
792         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
793                 chunkData, 1, totalChunks));
794
795         // Check if snapshot installation is in progress now
796         assertNotNull(((Follower) follower).getSnapshotTracker());
797
798         // Send an append entry
799         AppendEntries appendEntries = mock(AppendEntries.class);
800         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
801
802         follower.handleMessage(leaderActor, appendEntries);
803
804         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
805         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
806         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
807         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
808
809         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
810         verify(appendEntries, never()).getPrevLogIndex();
811
812     }
813
814     @Test
815     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
816         logStart("testInitialSyncUpWithHandleInstallSnapshot");
817
818         MockRaftActorContext context = createActorContext();
819
820         follower = createBehavior(context);
821
822         ByteString bsSnapshot  = createSnapshot();
823         int offset = 0;
824         int snapshotLength = bsSnapshot.size();
825         int chunkSize = 50;
826         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
827         int lastIncludedIndex = 1;
828         int chunkIndex = 1;
829         InstallSnapshot lastInstallSnapshot = null;
830
831         for(int i = 0; i < totalChunks; i++) {
832             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
833             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
834                     chunkData, chunkIndex, totalChunks);
835             follower.handleMessage(leaderActor, lastInstallSnapshot);
836             offset = offset + 50;
837             lastIncludedIndex++;
838             chunkIndex++;
839         }
840
841         FollowerInitialSyncUpStatus syncStatus =
842                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
843
844         assertFalse(syncStatus.isInitialSyncDone());
845
846         // Clear all the messages
847         followerActor.underlyingActor().clear();
848
849         context.setLastApplied(101);
850         context.setCommitIndex(101);
851         setLastLogEntry(context, 1, 101,
852                 new MockRaftActorContext.MockPayload(""));
853
854         List<ReplicatedLogEntry> entries = Arrays.asList(
855                 newReplicatedLogEntry(2, 101, "foo"));
856
857         // The new commitIndex is 101
858         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
859         follower.handleMessage(leaderActor, appendEntries);
860
861         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
862
863         assertTrue(syncStatus.isInitialSyncDone());
864     }
865
866     @Test
867     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
868         logStart("testHandleOutOfSequenceInstallSnapshot");
869
870         MockRaftActorContext context = createActorContext();
871
872         follower = createBehavior(context);
873
874         ByteString bsSnapshot = createSnapshot();
875
876         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
877                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
878         follower.handleMessage(leaderActor, installSnapshot);
879
880         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
881                 InstallSnapshotReply.class);
882
883         assertEquals("isSuccess", false, reply.isSuccess());
884         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
885         assertEquals("getTerm", 1, reply.getTerm());
886         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
887
888         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
889     }
890
891     @Test
892     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
893         MockRaftActorContext context = createActorContext();
894
895         Stopwatch stopwatch = Stopwatch.createStarted();
896
897         follower = createBehavior(context);
898
899         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
900
901         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
902
903         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
904     }
905
906     @Test
907     public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
908         MockRaftActorContext context = createActorContext();
909         context.setConfigParams(new DefaultConfigParamsImpl(){
910             @Override
911             public FiniteDuration getElectionTimeOutInterval() {
912                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
913             }
914         });
915
916         context.setRaftPolicy(createRaftPolicy(false, false));
917
918         follower = createBehavior(context);
919
920         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
921     }
922
923     @Test
924     public void testElectionScheduledWhenAnyRaftRPCReceived(){
925         MockRaftActorContext context = createActorContext();
926         follower = createBehavior(context);
927         follower.handleMessage(leaderActor, new RaftRPC() {
928             @Override
929             public long getTerm() {
930                 return 100;
931             }
932         });
933         assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
934     }
935
936     @Test
937     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
938         MockRaftActorContext context = createActorContext();
939         follower = createBehavior(context);
940         follower.handleMessage(leaderActor, "non-raft-rpc");
941         assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
942     }
943
944     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
945         int snapshotLength = bs.size();
946         int start = offset;
947         int size = chunkSize;
948         if (chunkSize > snapshotLength) {
949             size = snapshotLength;
950         } else {
951             if ((start + chunkSize) > snapshotLength) {
952                 size = snapshotLength - start;
953             }
954         }
955         return bs.substring(start, start + size);
956     }
957
958     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
959             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
960
961         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
962                 AppendEntriesReply.class);
963
964         assertEquals("isSuccess", expSuccess, reply.isSuccess());
965         assertEquals("getTerm", expTerm, reply.getTerm());
966         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
967         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
968         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
969         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
970     }
971
972     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
973         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
974                 new MockRaftActorContext.MockPayload(data));
975     }
976
977     private ByteString createSnapshot(){
978         HashMap<String, String> followerSnapshot = new HashMap<>();
979         followerSnapshot.put("1", "A");
980         followerSnapshot.put("2", "B");
981         followerSnapshot.put("3", "C");
982
983         return toByteString(followerSnapshot);
984     }
985
986     @Override
987     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
988             ActorRef actorRef, RaftRPC rpc) throws Exception {
989         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
990
991         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
992         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
993     }
994
995     @Override
996     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
997             throws Exception {
998         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
999         assertEquals("isSuccess", true, reply.isSuccess());
1000     }
1001
1002     private static class TestFollower extends Follower {
1003
1004         int electionTimeoutCount = 0;
1005
1006         public TestFollower(RaftActorContext context) {
1007             super(context);
1008         }
1009
1010         @Override
1011         protected void scheduleElection(FiniteDuration interval) {
1012             electionTimeoutCount++;
1013             super.scheduleElection(interval);
1014         }
1015
1016         public int getElectionTimeoutCount() {
1017             return electionTimeoutCount;
1018         }
1019     }
1020 }