BUG 4212 : Follower should not reschedule election timeout in certain cases.
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.Snapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
32 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
37 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
39 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import scala.concurrent.duration.FiniteDuration;
42
43 public class FollowerTest extends AbstractRaftActorBehaviorTest {
44
45     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
46             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
47
48     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
49             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
50
51     private RaftActorBehavior follower;
52
53     private final short payloadVersion = 5;
54
55     @Override
56     @After
57     public void tearDown() throws Exception {
58         if(follower != null) {
59             follower.close();
60         }
61
62         super.tearDown();
63     }
64
65     @Override
66     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
67         return new TestFollower(actorContext);
68     }
69
70     @Override
71     protected  MockRaftActorContext createActorContext() {
72         return createActorContext(followerActor);
73     }
74
75     @Override
76     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
77         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
78         context.setPayloadVersion(payloadVersion );
79         return context;
80     }
81
82     private int getElectionTimeoutCount(RaftActorBehavior follower){
83         if(follower instanceof TestFollower){
84             return ((TestFollower) follower).getElectionTimeoutCount();
85         }
86         return -1;
87     }
88
89     @Test
90     public void testThatAnElectionTimeoutIsTriggered(){
91         MockRaftActorContext actorContext = createActorContext();
92         follower = new Follower(actorContext);
93
94         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
95                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
96     }
97
98     @Test
99     public void testHandleElectionTimeout(){
100         logStart("testHandleElectionTimeout");
101
102         follower = new Follower(createActorContext());
103
104         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
105
106         assertTrue(raftBehavior instanceof Candidate);
107     }
108
109     @Test
110     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
111         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
112
113         RaftActorContext context = createActorContext();
114         long term = 1000;
115         context.getTermInformation().update(term, null);
116
117         follower = createBehavior(context);
118
119         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
120
121         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
122
123         assertEquals("isVoteGranted", true, reply.isVoteGranted());
124         assertEquals("getTerm", term, reply.getTerm());
125         assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
126     }
127
128     @Test
129     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
130         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
131
132         RaftActorContext context = createActorContext();
133         long term = 1000;
134         context.getTermInformation().update(term, "test");
135
136         follower = createBehavior(context);
137
138         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
139
140         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
141
142         assertEquals("isVoteGranted", false, reply.isVoteGranted());
143         assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
144     }
145
146
147     @Test
148     public void testHandleFirstAppendEntries() throws Exception {
149         logStart("testHandleFirstAppendEntries");
150
151         MockRaftActorContext context = createActorContext();
152         context.getReplicatedLog().clear(0,2);
153         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
154         context.getReplicatedLog().setSnapshotIndex(99);
155
156         List<ReplicatedLogEntry> entries = Arrays.asList(
157                 newReplicatedLogEntry(2, 101, "foo"));
158
159         Assert.assertEquals(1, context.getReplicatedLog().size());
160
161         // The new commitIndex is 101
162         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
163
164         follower = createBehavior(context);
165         follower.handleMessage(leaderActor, appendEntries);
166
167         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
168         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
169
170         assertFalse(syncStatus.isInitialSyncDone());
171         assertTrue("append entries reply should be true", reply.isSuccess());
172     }
173
174     @Test
175     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
176         logStart("testHandleFirstAppendEntries");
177
178         MockRaftActorContext context = createActorContext();
179
180         List<ReplicatedLogEntry> entries = Arrays.asList(
181                 newReplicatedLogEntry(2, 101, "foo"));
182
183         // The new commitIndex is 101
184         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
185
186         follower = createBehavior(context);
187         follower.handleMessage(leaderActor, appendEntries);
188
189         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
190         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
191
192         assertFalse(syncStatus.isInitialSyncDone());
193         assertFalse("append entries reply should be false", reply.isSuccess());
194     }
195
196     @Test
197     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
198         logStart("testHandleFirstAppendEntries");
199
200         MockRaftActorContext context = createActorContext();
201         context.getReplicatedLog().clear(0,2);
202         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
203         context.getReplicatedLog().setSnapshotIndex(99);
204
205         List<ReplicatedLogEntry> entries = Arrays.asList(
206                 newReplicatedLogEntry(2, 101, "foo"));
207
208         // The new commitIndex is 101
209         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
210
211         follower = createBehavior(context);
212         follower.handleMessage(leaderActor, appendEntries);
213
214         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
215         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
216
217         assertFalse(syncStatus.isInitialSyncDone());
218         assertTrue("append entries reply should be true", reply.isSuccess());
219     }
220
221     @Test
222     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
223         logStart("testHandleFirstAppendEntries");
224
225         MockRaftActorContext context = createActorContext();
226         context.getReplicatedLog().clear(0,2);
227         context.getReplicatedLog().setSnapshotIndex(100);
228
229         List<ReplicatedLogEntry> entries = Arrays.asList(
230                 newReplicatedLogEntry(2, 101, "foo"));
231
232         // The new commitIndex is 101
233         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
234
235         follower = createBehavior(context);
236         follower.handleMessage(leaderActor, appendEntries);
237
238         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
239         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
240
241         assertFalse(syncStatus.isInitialSyncDone());
242         assertTrue("append entries reply should be true", reply.isSuccess());
243     }
244
245     @Test
246     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
247         logStart("testHandleFirstAppendEntries");
248
249         MockRaftActorContext context = createActorContext();
250         context.getReplicatedLog().clear(0,2);
251         context.getReplicatedLog().setSnapshotIndex(100);
252
253         List<ReplicatedLogEntry> entries = Arrays.asList(
254                 newReplicatedLogEntry(2, 105, "foo"));
255
256         // The new commitIndex is 101
257         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
258
259         follower = createBehavior(context);
260         follower.handleMessage(leaderActor, appendEntries);
261
262         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
263         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
264
265         assertFalse(syncStatus.isInitialSyncDone());
266         assertFalse("append entries reply should be false", reply.isSuccess());
267     }
268
269     @Test
270     public void testHandleSyncUpAppendEntries() throws Exception {
271         logStart("testHandleSyncUpAppendEntries");
272
273         MockRaftActorContext context = createActorContext();
274
275         List<ReplicatedLogEntry> entries = Arrays.asList(
276                 newReplicatedLogEntry(2, 101, "foo"));
277
278         // The new commitIndex is 101
279         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
280
281         follower = createBehavior(context);
282         follower.handleMessage(leaderActor, appendEntries);
283
284         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
285
286         assertFalse(syncStatus.isInitialSyncDone());
287
288         // Clear all the messages
289         followerActor.underlyingActor().clear();
290
291         context.setLastApplied(101);
292         context.setCommitIndex(101);
293         setLastLogEntry(context, 1, 101,
294                 new MockRaftActorContext.MockPayload(""));
295
296         entries = Arrays.asList(
297                 newReplicatedLogEntry(2, 101, "foo"));
298
299         // The new commitIndex is 101
300         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
301         follower.handleMessage(leaderActor, appendEntries);
302
303         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
304
305         assertTrue(syncStatus.isInitialSyncDone());
306
307         followerActor.underlyingActor().clear();
308
309         // Sending the same message again should not generate another message
310
311         follower.handleMessage(leaderActor, appendEntries);
312
313         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
314
315         assertNull(syncStatus);
316
317     }
318
319     @Test
320     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
321         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
322
323         MockRaftActorContext context = createActorContext();
324
325         List<ReplicatedLogEntry> entries = Arrays.asList(
326                 newReplicatedLogEntry(2, 101, "foo"));
327
328         // The new commitIndex is 101
329         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
330
331         follower = createBehavior(context);
332         follower.handleMessage(leaderActor, appendEntries);
333
334         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
335
336         assertFalse(syncStatus.isInitialSyncDone());
337
338         // Clear all the messages
339         followerActor.underlyingActor().clear();
340
341         context.setLastApplied(100);
342         setLastLogEntry(context, 1, 100,
343                 new MockRaftActorContext.MockPayload(""));
344
345         entries = Arrays.asList(
346                 newReplicatedLogEntry(2, 101, "foo"));
347
348         // leader-2 is becoming the leader now and it says the commitIndex is 45
349         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
350         follower.handleMessage(leaderActor, appendEntries);
351
352         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
353
354         // We get a new message saying initial status is not done
355         assertFalse(syncStatus.isInitialSyncDone());
356
357     }
358
359
360     @Test
361     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
362         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
363
364         MockRaftActorContext context = createActorContext();
365
366         List<ReplicatedLogEntry> entries = Arrays.asList(
367                 newReplicatedLogEntry(2, 101, "foo"));
368
369         // The new commitIndex is 101
370         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
371
372         follower = createBehavior(context);
373         follower.handleMessage(leaderActor, appendEntries);
374
375         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
376
377         assertFalse(syncStatus.isInitialSyncDone());
378
379         // Clear all the messages
380         followerActor.underlyingActor().clear();
381
382         context.setLastApplied(101);
383         context.setCommitIndex(101);
384         setLastLogEntry(context, 1, 101,
385                 new MockRaftActorContext.MockPayload(""));
386
387         entries = Arrays.asList(
388                 newReplicatedLogEntry(2, 101, "foo"));
389
390         // The new commitIndex is 101
391         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
392         follower.handleMessage(leaderActor, appendEntries);
393
394         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
395
396         assertTrue(syncStatus.isInitialSyncDone());
397
398         // Clear all the messages
399         followerActor.underlyingActor().clear();
400
401         context.setLastApplied(100);
402         setLastLogEntry(context, 1, 100,
403                 new MockRaftActorContext.MockPayload(""));
404
405         entries = Arrays.asList(
406                 newReplicatedLogEntry(2, 101, "foo"));
407
408         // leader-2 is becoming the leader now and it says the commitIndex is 45
409         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
410         follower.handleMessage(leaderActor, appendEntries);
411
412         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
413
414         // We get a new message saying initial status is not done
415         assertFalse(syncStatus.isInitialSyncDone());
416
417     }
418
419
420     /**
421      * This test verifies that when an AppendEntries RPC is received by a RaftActor
422      * with a commitIndex that is greater than what has been applied to the
423      * state machine of the RaftActor, the RaftActor applies the state and
424      * sets it current applied state to the commitIndex of the sender.
425      *
426      * @throws Exception
427      */
428     @Test
429     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
430         logStart("testHandleAppendEntriesWithNewerCommitIndex");
431
432         MockRaftActorContext context = createActorContext();
433
434         context.setLastApplied(100);
435         setLastLogEntry(context, 1, 100,
436                 new MockRaftActorContext.MockPayload(""));
437         context.getReplicatedLog().setSnapshotIndex(99);
438
439         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
440                 newReplicatedLogEntry(2, 101, "foo"));
441
442         // The new commitIndex is 101
443         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
444
445         follower = createBehavior(context);
446         follower.handleMessage(leaderActor, appendEntries);
447
448         assertEquals("getLastApplied", 101L, context.getLastApplied());
449     }
450
451     /**
452      * This test verifies that when an AppendEntries is received a specific prevLogTerm
453      * which does not match the term that is in RaftActors log entry at prevLogIndex
454      * then the RaftActor does not change it's state and it returns a failure.
455      *
456      * @throws Exception
457      */
458     @Test
459     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
460         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
461
462         MockRaftActorContext context = createActorContext();
463
464         // First set the receivers term to lower number
465         context.getTermInformation().update(95, "test");
466
467         // AppendEntries is now sent with a bigger term
468         // this will set the receivers term to be the same as the sender's term
469         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
470
471         follower = createBehavior(context);
472
473         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
474
475         Assert.assertSame(follower, newBehavior);
476
477         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
478                 AppendEntriesReply.class);
479
480         assertEquals("isSuccess", false, reply.isSuccess());
481     }
482
483     /**
484      * This test verifies that when a new AppendEntries message is received with
485      * new entries and the logs of the sender and receiver match that the new
486      * entries get added to the log and the log is incremented by the number of
487      * entries received in appendEntries
488      *
489      * @throws Exception
490      */
491     @Test
492     public void testHandleAppendEntriesAddNewEntries() {
493         logStart("testHandleAppendEntriesAddNewEntries");
494
495         MockRaftActorContext context = createActorContext();
496
497         // First set the receivers term to lower number
498         context.getTermInformation().update(1, "test");
499
500         // Prepare the receivers log
501         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
502         log.append(newReplicatedLogEntry(1, 0, "zero"));
503         log.append(newReplicatedLogEntry(1, 1, "one"));
504         log.append(newReplicatedLogEntry(1, 2, "two"));
505
506         context.setReplicatedLog(log);
507
508         // Prepare the entries to be sent with AppendEntries
509         List<ReplicatedLogEntry> entries = new ArrayList<>();
510         entries.add(newReplicatedLogEntry(1, 3, "three"));
511         entries.add(newReplicatedLogEntry(1, 4, "four"));
512
513         // Send appendEntries with the same term as was set on the receiver
514         // before the new behavior was created (1 in this case)
515         // This will not work for a Candidate because as soon as a Candidate
516         // is created it increments the term
517         short leaderPayloadVersion = 10;
518         String leaderId = "leader-1";
519         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
520
521         follower = createBehavior(context);
522
523         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
524
525         Assert.assertSame(follower, newBehavior);
526
527         assertEquals("Next index", 5, log.last().getIndex() + 1);
528         assertEquals("Entry 3", entries.get(0), log.get(3));
529         assertEquals("Entry 4", entries.get(1), log.get(4));
530
531         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
532         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
533
534         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
535     }
536
537     /**
538      * This test verifies that when a new AppendEntries message is received with
539      * new entries and the logs of the sender and receiver are out-of-sync that
540      * the log is first corrected by removing the out of sync entries from the
541      * log and then adding in the new entries sent with the AppendEntries message
542      */
543     @Test
544     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
545         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
546
547         MockRaftActorContext context = createActorContext();
548
549         // First set the receivers term to lower number
550         context.getTermInformation().update(1, "test");
551
552         // Prepare the receivers log
553         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
554         log.append(newReplicatedLogEntry(1, 0, "zero"));
555         log.append(newReplicatedLogEntry(1, 1, "one"));
556         log.append(newReplicatedLogEntry(1, 2, "two"));
557
558         context.setReplicatedLog(log);
559
560         // Prepare the entries to be sent with AppendEntries
561         List<ReplicatedLogEntry> entries = new ArrayList<>();
562         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
563         entries.add(newReplicatedLogEntry(2, 3, "three"));
564
565         // Send appendEntries with the same term as was set on the receiver
566         // before the new behavior was created (1 in this case)
567         // This will not work for a Candidate because as soon as a Candidate
568         // is created it increments the term
569         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
570
571         follower = createBehavior(context);
572
573         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
574
575         Assert.assertSame(follower, newBehavior);
576
577         // The entry at index 2 will be found out-of-sync with the leader
578         // and will be removed
579         // Then the two new entries will be added to the log
580         // Thus making the log to have 4 entries
581         assertEquals("Next index", 4, log.last().getIndex() + 1);
582         //assertEquals("Entry 2", entries.get(0), log.get(2));
583
584         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
585
586         // Check that the entry at index 2 has the new data
587         assertEquals("Entry 2", entries.get(0), log.get(2));
588
589         assertEquals("Entry 3", entries.get(1), log.get(3));
590
591         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
592     }
593
594     @Test
595     public void testHandleAppendEntriesPreviousLogEntryMissing(){
596         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
597
598         MockRaftActorContext context = createActorContext();
599
600         // Prepare the receivers log
601         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
602         log.append(newReplicatedLogEntry(1, 0, "zero"));
603         log.append(newReplicatedLogEntry(1, 1, "one"));
604         log.append(newReplicatedLogEntry(1, 2, "two"));
605
606         context.setReplicatedLog(log);
607
608         // Prepare the entries to be sent with AppendEntries
609         List<ReplicatedLogEntry> entries = new ArrayList<>();
610         entries.add(newReplicatedLogEntry(1, 4, "four"));
611
612         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
613
614         follower = createBehavior(context);
615
616         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
617
618         Assert.assertSame(follower, newBehavior);
619
620         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
621     }
622
623     @Test
624     public void testHandleAppendEntriesWithExistingLogEntry() {
625         logStart("testHandleAppendEntriesWithExistingLogEntry");
626
627         MockRaftActorContext context = createActorContext();
628
629         context.getTermInformation().update(1, "test");
630
631         // Prepare the receivers log
632         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
633         log.append(newReplicatedLogEntry(1, 0, "zero"));
634         log.append(newReplicatedLogEntry(1, 1, "one"));
635
636         context.setReplicatedLog(log);
637
638         // Send the last entry again.
639         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
640
641         follower = createBehavior(context);
642
643         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
644
645         assertEquals("Next index", 2, log.last().getIndex() + 1);
646         assertEquals("Entry 1", entries.get(0), log.get(1));
647
648         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
649
650         // Send the last entry again and also a new one.
651
652         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
653
654         leaderActor.underlyingActor().clear();
655         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
656
657         assertEquals("Next index", 3, log.last().getIndex() + 1);
658         assertEquals("Entry 1", entries.get(0), log.get(1));
659         assertEquals("Entry 2", entries.get(1), log.get(2));
660
661         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
662     }
663
664     @Test
665     public void testHandleAppendEntriesAfterInstallingSnapshot(){
666         logStart("testHandleAppendAfterInstallingSnapshot");
667
668         MockRaftActorContext context = createActorContext();
669
670         // Prepare the receivers log
671         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
672
673         // Set up a log as if it has been snapshotted
674         log.setSnapshotIndex(3);
675         log.setSnapshotTerm(1);
676
677         context.setReplicatedLog(log);
678
679         // Prepare the entries to be sent with AppendEntries
680         List<ReplicatedLogEntry> entries = new ArrayList<>();
681         entries.add(newReplicatedLogEntry(1, 4, "four"));
682
683         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
684
685         follower = createBehavior(context);
686
687         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
688
689         Assert.assertSame(follower, newBehavior);
690
691         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
692     }
693
694
695     /**
696      * This test verifies that when InstallSnapshot is received by
697      * the follower its applied correctly.
698      *
699      * @throws Exception
700      */
701     @Test
702     public void testHandleInstallSnapshot() throws Exception {
703         logStart("testHandleInstallSnapshot");
704
705         MockRaftActorContext context = createActorContext();
706
707         follower = createBehavior(context);
708
709         ByteString bsSnapshot  = createSnapshot();
710         int offset = 0;
711         int snapshotLength = bsSnapshot.size();
712         int chunkSize = 50;
713         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
714         int lastIncludedIndex = 1;
715         int chunkIndex = 1;
716         InstallSnapshot lastInstallSnapshot = null;
717
718         for(int i = 0; i < totalChunks; i++) {
719             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
720             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
721                     chunkData, chunkIndex, totalChunks);
722             follower.handleMessage(leaderActor, lastInstallSnapshot);
723             offset = offset + 50;
724             lastIncludedIndex++;
725             chunkIndex++;
726         }
727
728         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
729                 ApplySnapshot.class);
730         Snapshot snapshot = applySnapshot.getSnapshot();
731         assertNotNull(lastInstallSnapshot);
732         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
733         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
734                 snapshot.getLastAppliedTerm());
735         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
736                 snapshot.getLastAppliedIndex());
737         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
738         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
739
740         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
741                 leaderActor, InstallSnapshotReply.class);
742         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
743
744         chunkIndex = 1;
745         for(InstallSnapshotReply reply: replies) {
746             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
747             assertEquals("getTerm", 1, reply.getTerm());
748             assertEquals("isSuccess", true, reply.isSuccess());
749             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
750         }
751
752         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
753     }
754
755
756     /**
757      * Verify that when an AppendEntries is sent to a follower during a snapshot install
758      * the Follower short-circuits the processing of the AppendEntries message.
759      *
760      * @throws Exception
761      */
762     @Test
763     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
764         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
765
766         MockRaftActorContext context = createActorContext();
767
768         follower = createBehavior(context);
769
770         ByteString bsSnapshot  = createSnapshot();
771         int snapshotLength = bsSnapshot.size();
772         int chunkSize = 50;
773         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
774         int lastIncludedIndex = 1;
775
776         // Check that snapshot installation is not in progress
777         assertNull(((Follower) follower).getSnapshotTracker());
778
779         // Make sure that we have more than 1 chunk to send
780         assertTrue(totalChunks > 1);
781
782         // Send an install snapshot with the first chunk to start the process of installing a snapshot
783         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
784         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
785                 chunkData, 1, totalChunks));
786
787         // Check if snapshot installation is in progress now
788         assertNotNull(((Follower) follower).getSnapshotTracker());
789
790         // Send an append entry
791         AppendEntries appendEntries = mock(AppendEntries.class);
792         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
793
794         follower.handleMessage(leaderActor, appendEntries);
795
796         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
797         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
798         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
799         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
800
801         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
802         verify(appendEntries, never()).getPrevLogIndex();
803
804     }
805
806     @Test
807     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
808         logStart("testInitialSyncUpWithHandleInstallSnapshot");
809
810         MockRaftActorContext context = createActorContext();
811
812         follower = createBehavior(context);
813
814         ByteString bsSnapshot  = createSnapshot();
815         int offset = 0;
816         int snapshotLength = bsSnapshot.size();
817         int chunkSize = 50;
818         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
819         int lastIncludedIndex = 1;
820         int chunkIndex = 1;
821         InstallSnapshot lastInstallSnapshot = null;
822
823         for(int i = 0; i < totalChunks; i++) {
824             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
825             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
826                     chunkData, chunkIndex, totalChunks);
827             follower.handleMessage(leaderActor, lastInstallSnapshot);
828             offset = offset + 50;
829             lastIncludedIndex++;
830             chunkIndex++;
831         }
832
833         FollowerInitialSyncUpStatus syncStatus =
834                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
835
836         assertFalse(syncStatus.isInitialSyncDone());
837
838         // Clear all the messages
839         followerActor.underlyingActor().clear();
840
841         context.setLastApplied(101);
842         context.setCommitIndex(101);
843         setLastLogEntry(context, 1, 101,
844                 new MockRaftActorContext.MockPayload(""));
845
846         List<ReplicatedLogEntry> entries = Arrays.asList(
847                 newReplicatedLogEntry(2, 101, "foo"));
848
849         // The new commitIndex is 101
850         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
851         follower.handleMessage(leaderActor, appendEntries);
852
853         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
854
855         assertTrue(syncStatus.isInitialSyncDone());
856     }
857
858     @Test
859     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
860         logStart("testHandleOutOfSequenceInstallSnapshot");
861
862         MockRaftActorContext context = createActorContext();
863
864         follower = createBehavior(context);
865
866         ByteString bsSnapshot = createSnapshot();
867
868         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
869                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
870         follower.handleMessage(leaderActor, installSnapshot);
871
872         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
873                 InstallSnapshotReply.class);
874
875         assertEquals("isSuccess", false, reply.isSuccess());
876         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
877         assertEquals("getTerm", 1, reply.getTerm());
878         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
879
880         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
881     }
882
883     @Test
884     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
885         MockRaftActorContext context = createActorContext();
886
887         Stopwatch stopwatch = Stopwatch.createStarted();
888
889         follower = createBehavior(context);
890
891         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
892
893         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
894
895         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
896     }
897
898     @Test
899     public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
900         MockRaftActorContext context = createActorContext();
901         context.setConfigParams(new DefaultConfigParamsImpl(){
902             @Override
903             public FiniteDuration getElectionTimeOutInterval() {
904                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
905             }
906         });
907
908         context.setRaftPolicy(createRaftPolicy(false, false));
909
910         follower = createBehavior(context);
911
912         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
913     }
914
915     @Test
916     public void testElectionScheduledWhenAnyRaftRPCReceived(){
917         MockRaftActorContext context = createActorContext();
918         follower = createBehavior(context);
919         follower.handleMessage(leaderActor, new RaftRPC() {
920             @Override
921             public long getTerm() {
922                 return 100;
923             }
924         });
925         assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
926     }
927
928     @Test
929     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
930         MockRaftActorContext context = createActorContext();
931         follower = createBehavior(context);
932         follower.handleMessage(leaderActor, "non-raft-rpc");
933         assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
934     }
935
936     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
937         int snapshotLength = bs.size();
938         int start = offset;
939         int size = chunkSize;
940         if (chunkSize > snapshotLength) {
941             size = snapshotLength;
942         } else {
943             if ((start + chunkSize) > snapshotLength) {
944                 size = snapshotLength - start;
945             }
946         }
947         return bs.substring(start, start + size);
948     }
949
950     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
951             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
952
953         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
954                 AppendEntriesReply.class);
955
956         assertEquals("isSuccess", expSuccess, reply.isSuccess());
957         assertEquals("getTerm", expTerm, reply.getTerm());
958         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
959         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
960         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
961         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
962     }
963
964     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
965         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
966                 new MockRaftActorContext.MockPayload(data));
967     }
968
969     private ByteString createSnapshot(){
970         HashMap<String, String> followerSnapshot = new HashMap<>();
971         followerSnapshot.put("1", "A");
972         followerSnapshot.put("2", "B");
973         followerSnapshot.put("3", "C");
974
975         return toByteString(followerSnapshot);
976     }
977
978     @Override
979     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
980             ActorRef actorRef, RaftRPC rpc) throws Exception {
981         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
982
983         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
984         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
985     }
986
987     @Override
988     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
989             throws Exception {
990         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
991         assertEquals("isSuccess", true, reply.isSuccess());
992     }
993
994     private static class TestFollower extends Follower {
995
996         int electionTimeoutCount = 0;
997
998         public TestFollower(RaftActorContext context) {
999             super(context);
1000         }
1001
1002         @Override
1003         protected void scheduleElection(FiniteDuration interval) {
1004             electionTimeoutCount++;
1005             super.scheduleElection(interval);
1006         }
1007
1008         public int getElectionTimeoutCount() {
1009             return electionTimeoutCount;
1010         }
1011     }
1012 }