Fix license header violations in sal-akka-raft
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.Props;
22 import akka.testkit.TestActorRef;
23 import com.google.common.base.Stopwatch;
24 import com.google.protobuf.ByteString;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
34 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftActorContext;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.Snapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
40 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
48 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
49 import scala.concurrent.duration.FiniteDuration;
50
51 public class FollowerTest extends AbstractRaftActorBehaviorTest {
52
53     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
54             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
55
56     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
57             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
58
59     private RaftActorBehavior follower;
60
61     private final short payloadVersion = 5;
62
63     @Override
64     @After
65     public void tearDown() throws Exception {
66         if(follower != null) {
67             follower.close();
68         }
69
70         super.tearDown();
71     }
72
73     @Override
74     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
75         return new Follower(actorContext);
76     }
77
78     @Override
79     protected  MockRaftActorContext createActorContext() {
80         return createActorContext(followerActor);
81     }
82
83     @Override
84     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
85         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
86         context.setPayloadVersion(payloadVersion );
87         return context;
88     }
89
90     @Test
91     public void testThatAnElectionTimeoutIsTriggered(){
92         MockRaftActorContext actorContext = createActorContext();
93         follower = new Follower(actorContext);
94
95         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
96                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
97     }
98
99     @Test
100     public void testHandleElectionTimeout(){
101         logStart("testHandleElectionTimeout");
102
103         follower = new Follower(createActorContext());
104
105         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
106
107         assertTrue(raftBehavior instanceof Candidate);
108     }
109
110     @Test
111     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
112         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
113
114         RaftActorContext context = createActorContext();
115         long term = 1000;
116         context.getTermInformation().update(term, null);
117
118         follower = createBehavior(context);
119
120         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
121
122         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
123
124         assertEquals("isVoteGranted", true, reply.isVoteGranted());
125         assertEquals("getTerm", term, reply.getTerm());
126     }
127
128     @Test
129     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
130         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
131
132         RaftActorContext context = createActorContext();
133         long term = 1000;
134         context.getTermInformation().update(term, "test");
135
136         follower = createBehavior(context);
137
138         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
139
140         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
141
142         assertEquals("isVoteGranted", false, reply.isVoteGranted());
143     }
144
145
146     @Test
147     public void testHandleFirstAppendEntries() throws Exception {
148         logStart("testHandleFirstAppendEntries");
149
150         MockRaftActorContext context = createActorContext();
151         context.getReplicatedLog().clear(0,2);
152         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
153         context.getReplicatedLog().setSnapshotIndex(99);
154
155         List<ReplicatedLogEntry> entries = Arrays.asList(
156                 newReplicatedLogEntry(2, 101, "foo"));
157
158         Assert.assertEquals(1, context.getReplicatedLog().size());
159
160         // The new commitIndex is 101
161         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
162
163         follower = createBehavior(context);
164         follower.handleMessage(leaderActor, appendEntries);
165
166         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
167         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
168
169         assertFalse(syncStatus.isInitialSyncDone());
170         assertTrue("append entries reply should be true", reply.isSuccess());
171     }
172
173     @Test
174     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
175         logStart("testHandleFirstAppendEntries");
176
177         MockRaftActorContext context = createActorContext();
178
179         List<ReplicatedLogEntry> entries = Arrays.asList(
180                 newReplicatedLogEntry(2, 101, "foo"));
181
182         // The new commitIndex is 101
183         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
184
185         follower = createBehavior(context);
186         follower.handleMessage(leaderActor, appendEntries);
187
188         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
189         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
190
191         assertFalse(syncStatus.isInitialSyncDone());
192         assertFalse("append entries reply should be false", reply.isSuccess());
193     }
194
195     @Test
196     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
197         logStart("testHandleFirstAppendEntries");
198
199         MockRaftActorContext context = createActorContext();
200         context.getReplicatedLog().clear(0,2);
201         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
202         context.getReplicatedLog().setSnapshotIndex(99);
203
204         List<ReplicatedLogEntry> entries = Arrays.asList(
205                 newReplicatedLogEntry(2, 101, "foo"));
206
207         // The new commitIndex is 101
208         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
209
210         follower = createBehavior(context);
211         follower.handleMessage(leaderActor, appendEntries);
212
213         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
214         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
215
216         assertFalse(syncStatus.isInitialSyncDone());
217         assertTrue("append entries reply should be true", reply.isSuccess());
218     }
219
220     @Test
221     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
222         logStart("testHandleFirstAppendEntries");
223
224         MockRaftActorContext context = createActorContext();
225         context.getReplicatedLog().clear(0,2);
226         context.getReplicatedLog().setSnapshotIndex(100);
227
228         List<ReplicatedLogEntry> entries = Arrays.asList(
229                 newReplicatedLogEntry(2, 101, "foo"));
230
231         // The new commitIndex is 101
232         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
233
234         follower = createBehavior(context);
235         follower.handleMessage(leaderActor, appendEntries);
236
237         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
238         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
239
240         assertFalse(syncStatus.isInitialSyncDone());
241         assertTrue("append entries reply should be true", reply.isSuccess());
242     }
243
244     @Test
245     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
246         logStart("testHandleFirstAppendEntries");
247
248         MockRaftActorContext context = createActorContext();
249         context.getReplicatedLog().clear(0,2);
250         context.getReplicatedLog().setSnapshotIndex(100);
251
252         List<ReplicatedLogEntry> entries = Arrays.asList(
253                 newReplicatedLogEntry(2, 105, "foo"));
254
255         // The new commitIndex is 101
256         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
257
258         follower = createBehavior(context);
259         follower.handleMessage(leaderActor, appendEntries);
260
261         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
262         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
263
264         assertFalse(syncStatus.isInitialSyncDone());
265         assertFalse("append entries reply should be false", reply.isSuccess());
266     }
267
268     @Test
269     public void testHandleSyncUpAppendEntries() throws Exception {
270         logStart("testHandleSyncUpAppendEntries");
271
272         MockRaftActorContext context = createActorContext();
273
274         List<ReplicatedLogEntry> entries = Arrays.asList(
275                 newReplicatedLogEntry(2, 101, "foo"));
276
277         // The new commitIndex is 101
278         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
279
280         follower = createBehavior(context);
281         follower.handleMessage(leaderActor, appendEntries);
282
283         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
284
285         assertFalse(syncStatus.isInitialSyncDone());
286
287         // Clear all the messages
288         followerActor.underlyingActor().clear();
289
290         context.setLastApplied(101);
291         context.setCommitIndex(101);
292         setLastLogEntry(context, 1, 101,
293                 new MockRaftActorContext.MockPayload(""));
294
295         entries = Arrays.asList(
296                 newReplicatedLogEntry(2, 101, "foo"));
297
298         // The new commitIndex is 101
299         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
300         follower.handleMessage(leaderActor, appendEntries);
301
302         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
303
304         assertTrue(syncStatus.isInitialSyncDone());
305
306         followerActor.underlyingActor().clear();
307
308         // Sending the same message again should not generate another message
309
310         follower.handleMessage(leaderActor, appendEntries);
311
312         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
313
314         assertNull(syncStatus);
315
316     }
317
318     @Test
319     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
320         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
321
322         MockRaftActorContext context = createActorContext();
323
324         List<ReplicatedLogEntry> entries = Arrays.asList(
325                 newReplicatedLogEntry(2, 101, "foo"));
326
327         // The new commitIndex is 101
328         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
329
330         follower = createBehavior(context);
331         follower.handleMessage(leaderActor, appendEntries);
332
333         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
334
335         assertFalse(syncStatus.isInitialSyncDone());
336
337         // Clear all the messages
338         followerActor.underlyingActor().clear();
339
340         context.setLastApplied(100);
341         setLastLogEntry(context, 1, 100,
342                 new MockRaftActorContext.MockPayload(""));
343
344         entries = Arrays.asList(
345                 newReplicatedLogEntry(2, 101, "foo"));
346
347         // leader-2 is becoming the leader now and it says the commitIndex is 45
348         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
349         follower.handleMessage(leaderActor, appendEntries);
350
351         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
352
353         // We get a new message saying initial status is not done
354         assertFalse(syncStatus.isInitialSyncDone());
355
356     }
357
358
359     @Test
360     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
361         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
362
363         MockRaftActorContext context = createActorContext();
364
365         List<ReplicatedLogEntry> entries = Arrays.asList(
366                 newReplicatedLogEntry(2, 101, "foo"));
367
368         // The new commitIndex is 101
369         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
370
371         follower = createBehavior(context);
372         follower.handleMessage(leaderActor, appendEntries);
373
374         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
375
376         assertFalse(syncStatus.isInitialSyncDone());
377
378         // Clear all the messages
379         followerActor.underlyingActor().clear();
380
381         context.setLastApplied(101);
382         context.setCommitIndex(101);
383         setLastLogEntry(context, 1, 101,
384                 new MockRaftActorContext.MockPayload(""));
385
386         entries = Arrays.asList(
387                 newReplicatedLogEntry(2, 101, "foo"));
388
389         // The new commitIndex is 101
390         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
391         follower.handleMessage(leaderActor, appendEntries);
392
393         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
394
395         assertTrue(syncStatus.isInitialSyncDone());
396
397         // Clear all the messages
398         followerActor.underlyingActor().clear();
399
400         context.setLastApplied(100);
401         setLastLogEntry(context, 1, 100,
402                 new MockRaftActorContext.MockPayload(""));
403
404         entries = Arrays.asList(
405                 newReplicatedLogEntry(2, 101, "foo"));
406
407         // leader-2 is becoming the leader now and it says the commitIndex is 45
408         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
409         follower.handleMessage(leaderActor, appendEntries);
410
411         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
412
413         // We get a new message saying initial status is not done
414         assertFalse(syncStatus.isInitialSyncDone());
415
416     }
417
418
419     /**
420      * This test verifies that when an AppendEntries RPC is received by a RaftActor
421      * with a commitIndex that is greater than what has been applied to the
422      * state machine of the RaftActor, the RaftActor applies the state and
423      * sets it current applied state to the commitIndex of the sender.
424      *
425      * @throws Exception
426      */
427     @Test
428     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
429         logStart("testHandleAppendEntriesWithNewerCommitIndex");
430
431         MockRaftActorContext context = createActorContext();
432
433         context.setLastApplied(100);
434         setLastLogEntry(context, 1, 100,
435                 new MockRaftActorContext.MockPayload(""));
436         context.getReplicatedLog().setSnapshotIndex(99);
437
438         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
439                 newReplicatedLogEntry(2, 101, "foo"));
440
441         // The new commitIndex is 101
442         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
443
444         follower = createBehavior(context);
445         follower.handleMessage(leaderActor, appendEntries);
446
447         assertEquals("getLastApplied", 101L, context.getLastApplied());
448     }
449
450     /**
451      * This test verifies that when an AppendEntries is received a specific prevLogTerm
452      * which does not match the term that is in RaftActors log entry at prevLogIndex
453      * then the RaftActor does not change it's state and it returns a failure.
454      *
455      * @throws Exception
456      */
457     @Test
458     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
459         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
460
461         MockRaftActorContext context = createActorContext();
462
463         // First set the receivers term to lower number
464         context.getTermInformation().update(95, "test");
465
466         // AppendEntries is now sent with a bigger term
467         // this will set the receivers term to be the same as the sender's term
468         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
469
470         follower = createBehavior(context);
471
472         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
473
474         Assert.assertSame(follower, newBehavior);
475
476         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
477                 AppendEntriesReply.class);
478
479         assertEquals("isSuccess", false, reply.isSuccess());
480     }
481
482     /**
483      * This test verifies that when a new AppendEntries message is received with
484      * new entries and the logs of the sender and receiver match that the new
485      * entries get added to the log and the log is incremented by the number of
486      * entries received in appendEntries
487      *
488      * @throws Exception
489      */
490     @Test
491     public void testHandleAppendEntriesAddNewEntries() {
492         logStart("testHandleAppendEntriesAddNewEntries");
493
494         MockRaftActorContext context = createActorContext();
495
496         // First set the receivers term to lower number
497         context.getTermInformation().update(1, "test");
498
499         // Prepare the receivers log
500         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
501         log.append(newReplicatedLogEntry(1, 0, "zero"));
502         log.append(newReplicatedLogEntry(1, 1, "one"));
503         log.append(newReplicatedLogEntry(1, 2, "two"));
504
505         context.setReplicatedLog(log);
506
507         // Prepare the entries to be sent with AppendEntries
508         List<ReplicatedLogEntry> entries = new ArrayList<>();
509         entries.add(newReplicatedLogEntry(1, 3, "three"));
510         entries.add(newReplicatedLogEntry(1, 4, "four"));
511
512         // Send appendEntries with the same term as was set on the receiver
513         // before the new behavior was created (1 in this case)
514         // This will not work for a Candidate because as soon as a Candidate
515         // is created it increments the term
516         short leaderPayloadVersion = 10;
517         String leaderId = "leader-1";
518         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
519
520         follower = createBehavior(context);
521
522         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
523
524         Assert.assertSame(follower, newBehavior);
525
526         assertEquals("Next index", 5, log.last().getIndex() + 1);
527         assertEquals("Entry 3", entries.get(0), log.get(3));
528         assertEquals("Entry 4", entries.get(1), log.get(4));
529
530         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
531         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
532
533         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
534     }
535
536     /**
537      * This test verifies that when a new AppendEntries message is received with
538      * new entries and the logs of the sender and receiver are out-of-sync that
539      * the log is first corrected by removing the out of sync entries from the
540      * log and then adding in the new entries sent with the AppendEntries message
541      */
542     @Test
543     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
544         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
545
546         MockRaftActorContext context = createActorContext();
547
548         // First set the receivers term to lower number
549         context.getTermInformation().update(1, "test");
550
551         // Prepare the receivers log
552         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
553         log.append(newReplicatedLogEntry(1, 0, "zero"));
554         log.append(newReplicatedLogEntry(1, 1, "one"));
555         log.append(newReplicatedLogEntry(1, 2, "two"));
556
557         context.setReplicatedLog(log);
558
559         // Prepare the entries to be sent with AppendEntries
560         List<ReplicatedLogEntry> entries = new ArrayList<>();
561         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
562         entries.add(newReplicatedLogEntry(2, 3, "three"));
563
564         // Send appendEntries with the same term as was set on the receiver
565         // before the new behavior was created (1 in this case)
566         // This will not work for a Candidate because as soon as a Candidate
567         // is created it increments the term
568         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
569
570         follower = createBehavior(context);
571
572         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
573
574         Assert.assertSame(follower, newBehavior);
575
576         // The entry at index 2 will be found out-of-sync with the leader
577         // and will be removed
578         // Then the two new entries will be added to the log
579         // Thus making the log to have 4 entries
580         assertEquals("Next index", 4, log.last().getIndex() + 1);
581         //assertEquals("Entry 2", entries.get(0), log.get(2));
582
583         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
584
585         // Check that the entry at index 2 has the new data
586         assertEquals("Entry 2", entries.get(0), log.get(2));
587
588         assertEquals("Entry 3", entries.get(1), log.get(3));
589
590         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
591     }
592
593     @Test
594     public void testHandleAppendEntriesPreviousLogEntryMissing(){
595         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
596
597         MockRaftActorContext context = createActorContext();
598
599         // Prepare the receivers log
600         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
601         log.append(newReplicatedLogEntry(1, 0, "zero"));
602         log.append(newReplicatedLogEntry(1, 1, "one"));
603         log.append(newReplicatedLogEntry(1, 2, "two"));
604
605         context.setReplicatedLog(log);
606
607         // Prepare the entries to be sent with AppendEntries
608         List<ReplicatedLogEntry> entries = new ArrayList<>();
609         entries.add(newReplicatedLogEntry(1, 4, "four"));
610
611         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
612
613         follower = createBehavior(context);
614
615         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
616
617         Assert.assertSame(follower, newBehavior);
618
619         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
620     }
621
622     @Test
623     public void testHandleAppendEntriesWithExistingLogEntry() {
624         logStart("testHandleAppendEntriesWithExistingLogEntry");
625
626         MockRaftActorContext context = createActorContext();
627
628         context.getTermInformation().update(1, "test");
629
630         // Prepare the receivers log
631         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
632         log.append(newReplicatedLogEntry(1, 0, "zero"));
633         log.append(newReplicatedLogEntry(1, 1, "one"));
634
635         context.setReplicatedLog(log);
636
637         // Send the last entry again.
638         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
639
640         follower = createBehavior(context);
641
642         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
643
644         assertEquals("Next index", 2, log.last().getIndex() + 1);
645         assertEquals("Entry 1", entries.get(0), log.get(1));
646
647         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
648
649         // Send the last entry again and also a new one.
650
651         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
652
653         leaderActor.underlyingActor().clear();
654         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
655
656         assertEquals("Next index", 3, log.last().getIndex() + 1);
657         assertEquals("Entry 1", entries.get(0), log.get(1));
658         assertEquals("Entry 2", entries.get(1), log.get(2));
659
660         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
661     }
662
663     @Test
664     public void testHandleAppendEntriesAfterInstallingSnapshot(){
665         logStart("testHandleAppendAfterInstallingSnapshot");
666
667         MockRaftActorContext context = createActorContext();
668
669         // Prepare the receivers log
670         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
671
672         // Set up a log as if it has been snapshotted
673         log.setSnapshotIndex(3);
674         log.setSnapshotTerm(1);
675
676         context.setReplicatedLog(log);
677
678         // Prepare the entries to be sent with AppendEntries
679         List<ReplicatedLogEntry> entries = new ArrayList<>();
680         entries.add(newReplicatedLogEntry(1, 4, "four"));
681
682         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
683
684         follower = createBehavior(context);
685
686         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
687
688         Assert.assertSame(follower, newBehavior);
689
690         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
691     }
692
693
694     /**
695      * This test verifies that when InstallSnapshot is received by
696      * the follower its applied correctly.
697      *
698      * @throws Exception
699      */
700     @Test
701     public void testHandleInstallSnapshot() throws Exception {
702         logStart("testHandleInstallSnapshot");
703
704         MockRaftActorContext context = createActorContext();
705
706         follower = createBehavior(context);
707
708         ByteString bsSnapshot  = createSnapshot();
709         int offset = 0;
710         int snapshotLength = bsSnapshot.size();
711         int chunkSize = 50;
712         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
713         int lastIncludedIndex = 1;
714         int chunkIndex = 1;
715         InstallSnapshot lastInstallSnapshot = null;
716
717         for(int i = 0; i < totalChunks; i++) {
718             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
719             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
720                     chunkData, chunkIndex, totalChunks);
721             follower.handleMessage(leaderActor, lastInstallSnapshot);
722             offset = offset + 50;
723             lastIncludedIndex++;
724             chunkIndex++;
725         }
726
727         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
728                 ApplySnapshot.class);
729         Snapshot snapshot = applySnapshot.getSnapshot();
730         assertNotNull(lastInstallSnapshot);
731         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
732         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
733                 snapshot.getLastAppliedTerm());
734         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
735                 snapshot.getLastAppliedIndex());
736         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
737         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
738
739         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
740                 leaderActor, InstallSnapshotReply.class);
741         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
742
743         chunkIndex = 1;
744         for(InstallSnapshotReply reply: replies) {
745             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
746             assertEquals("getTerm", 1, reply.getTerm());
747             assertEquals("isSuccess", true, reply.isSuccess());
748             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
749         }
750
751         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
752     }
753
754
755     /**
756      * Verify that when an AppendEntries is sent to a follower during a snapshot install
757      * the Follower short-circuits the processing of the AppendEntries message.
758      *
759      * @throws Exception
760      */
761     @Test
762     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
763         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
764
765         MockRaftActorContext context = createActorContext();
766
767         follower = createBehavior(context);
768
769         ByteString bsSnapshot  = createSnapshot();
770         int snapshotLength = bsSnapshot.size();
771         int chunkSize = 50;
772         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
773         int lastIncludedIndex = 1;
774
775         // Check that snapshot installation is not in progress
776         assertNull(((Follower) follower).getSnapshotTracker());
777
778         // Make sure that we have more than 1 chunk to send
779         assertTrue(totalChunks > 1);
780
781         // Send an install snapshot with the first chunk to start the process of installing a snapshot
782         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
783         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
784                 chunkData, 1, totalChunks));
785
786         // Check if snapshot installation is in progress now
787         assertNotNull(((Follower) follower).getSnapshotTracker());
788
789         // Send an append entry
790         AppendEntries appendEntries = mock(AppendEntries.class);
791         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
792
793         follower.handleMessage(leaderActor, appendEntries);
794
795         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
796         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
797         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
798         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
799
800         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
801         verify(appendEntries, never()).getPrevLogIndex();
802
803     }
804
805     @Test
806     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
807         logStart("testInitialSyncUpWithHandleInstallSnapshot");
808
809         MockRaftActorContext context = createActorContext();
810
811         follower = createBehavior(context);
812
813         ByteString bsSnapshot  = createSnapshot();
814         int offset = 0;
815         int snapshotLength = bsSnapshot.size();
816         int chunkSize = 50;
817         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
818         int lastIncludedIndex = 1;
819         int chunkIndex = 1;
820         InstallSnapshot lastInstallSnapshot = null;
821
822         for(int i = 0; i < totalChunks; i++) {
823             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
824             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
825                     chunkData, chunkIndex, totalChunks);
826             follower.handleMessage(leaderActor, lastInstallSnapshot);
827             offset = offset + 50;
828             lastIncludedIndex++;
829             chunkIndex++;
830         }
831
832         FollowerInitialSyncUpStatus syncStatus =
833                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
834
835         assertFalse(syncStatus.isInitialSyncDone());
836
837         // Clear all the messages
838         followerActor.underlyingActor().clear();
839
840         context.setLastApplied(101);
841         context.setCommitIndex(101);
842         setLastLogEntry(context, 1, 101,
843                 new MockRaftActorContext.MockPayload(""));
844
845         List<ReplicatedLogEntry> entries = Arrays.asList(
846                 newReplicatedLogEntry(2, 101, "foo"));
847
848         // The new commitIndex is 101
849         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
850         follower.handleMessage(leaderActor, appendEntries);
851
852         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
853
854         assertTrue(syncStatus.isInitialSyncDone());
855     }
856
857     @Test
858     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
859         logStart("testHandleOutOfSequenceInstallSnapshot");
860
861         MockRaftActorContext context = createActorContext();
862
863         follower = createBehavior(context);
864
865         ByteString bsSnapshot = createSnapshot();
866
867         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
868                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
869         follower.handleMessage(leaderActor, installSnapshot);
870
871         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
872                 InstallSnapshotReply.class);
873
874         assertEquals("isSuccess", false, reply.isSuccess());
875         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
876         assertEquals("getTerm", 1, reply.getTerm());
877         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
878
879         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
880     }
881
882     @Test
883     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
884         MockRaftActorContext context = createActorContext();
885
886         Stopwatch stopwatch = Stopwatch.createStarted();
887
888         follower = createBehavior(context);
889
890         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
891
892         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
893
894         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
895     }
896
897     @Test
898     public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
899         MockRaftActorContext context = createActorContext();
900         context.setConfigParams(new DefaultConfigParamsImpl(){
901             @Override
902             public FiniteDuration getElectionTimeOutInterval() {
903                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
904             }
905         });
906
907         context.setRaftPolicy(createRaftPolicy(false, false));
908
909         follower = createBehavior(context);
910
911         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
912     }
913
914
915     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
916         int snapshotLength = bs.size();
917         int start = offset;
918         int size = chunkSize;
919         if (chunkSize > snapshotLength) {
920             size = snapshotLength;
921         } else {
922             if ((start + chunkSize) > snapshotLength) {
923                 size = snapshotLength - start;
924             }
925         }
926         return bs.substring(start, start + size);
927     }
928
929     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
930             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
931
932         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
933                 AppendEntriesReply.class);
934
935         assertEquals("isSuccess", expSuccess, reply.isSuccess());
936         assertEquals("getTerm", expTerm, reply.getTerm());
937         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
938         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
939         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
940         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
941     }
942
943     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
944         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
945                 new MockRaftActorContext.MockPayload(data));
946     }
947
948     private ByteString createSnapshot(){
949         HashMap<String, String> followerSnapshot = new HashMap<>();
950         followerSnapshot.put("1", "A");
951         followerSnapshot.put("2", "B");
952         followerSnapshot.put("3", "C");
953
954         return toByteString(followerSnapshot);
955     }
956
957     @Override
958     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
959             ActorRef actorRef, RaftRPC rpc) throws Exception {
960         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
961
962         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
963         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
964     }
965
966     @Override
967     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
968             throws Exception {
969         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
970         assertEquals("isSuccess", true, reply.isSuccess());
971     }
972 }