2cb0d7ce6546ad93d6cb71196e0bf020a4450fb3
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.Snapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
32 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
37 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
39 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import scala.concurrent.duration.FiniteDuration;
42
43 public class FollowerTest extends AbstractRaftActorBehaviorTest {
44
45     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
46             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
47
48     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
49             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
50
51     private RaftActorBehavior follower;
52
53     private final short payloadVersion = 5;
54
55     @Override
56     @After
57     public void tearDown() throws Exception {
58         if(follower != null) {
59             follower.close();
60         }
61
62         super.tearDown();
63     }
64
65     @Override
66     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
67         return new Follower(actorContext);
68     }
69
70     @Override
71     protected  MockRaftActorContext createActorContext() {
72         return createActorContext(followerActor);
73     }
74
75     @Override
76     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
77         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
78         context.setPayloadVersion(payloadVersion );
79         return context;
80     }
81
82     @Test
83     public void testThatAnElectionTimeoutIsTriggered(){
84         MockRaftActorContext actorContext = createActorContext();
85         follower = new Follower(actorContext);
86
87         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
88                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
89     }
90
91     @Test
92     public void testHandleElectionTimeout(){
93         logStart("testHandleElectionTimeout");
94
95         follower = new Follower(createActorContext());
96
97         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
98
99         assertTrue(raftBehavior instanceof Candidate);
100     }
101
102     @Test
103     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
104         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
105
106         RaftActorContext context = createActorContext();
107         long term = 1000;
108         context.getTermInformation().update(term, null);
109
110         follower = createBehavior(context);
111
112         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
113
114         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
115
116         assertEquals("isVoteGranted", true, reply.isVoteGranted());
117         assertEquals("getTerm", term, reply.getTerm());
118     }
119
120     @Test
121     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
122         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
123
124         RaftActorContext context = createActorContext();
125         long term = 1000;
126         context.getTermInformation().update(term, "test");
127
128         follower = createBehavior(context);
129
130         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
131
132         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
133
134         assertEquals("isVoteGranted", false, reply.isVoteGranted());
135     }
136
137
138     @Test
139     public void testHandleFirstAppendEntries() throws Exception {
140         logStart("testHandleFirstAppendEntries");
141
142         MockRaftActorContext context = createActorContext();
143         context.getReplicatedLog().clear(0,2);
144         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
145         context.getReplicatedLog().setSnapshotIndex(99);
146
147         List<ReplicatedLogEntry> entries = Arrays.asList(
148                 newReplicatedLogEntry(2, 101, "foo"));
149
150         Assert.assertEquals(1, context.getReplicatedLog().size());
151
152         // The new commitIndex is 101
153         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
154
155         follower = createBehavior(context);
156         follower.handleMessage(leaderActor, appendEntries);
157
158         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
159         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
160
161         assertFalse(syncStatus.isInitialSyncDone());
162         assertTrue("append entries reply should be true", reply.isSuccess());
163     }
164
165     @Test
166     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
167         logStart("testHandleFirstAppendEntries");
168
169         MockRaftActorContext context = createActorContext();
170
171         List<ReplicatedLogEntry> entries = Arrays.asList(
172                 newReplicatedLogEntry(2, 101, "foo"));
173
174         // The new commitIndex is 101
175         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
176
177         follower = createBehavior(context);
178         follower.handleMessage(leaderActor, appendEntries);
179
180         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
181         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
182
183         assertFalse(syncStatus.isInitialSyncDone());
184         assertFalse("append entries reply should be false", reply.isSuccess());
185     }
186
187     @Test
188     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
189         logStart("testHandleFirstAppendEntries");
190
191         MockRaftActorContext context = createActorContext();
192         context.getReplicatedLog().clear(0,2);
193         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
194         context.getReplicatedLog().setSnapshotIndex(99);
195
196         List<ReplicatedLogEntry> entries = Arrays.asList(
197                 newReplicatedLogEntry(2, 101, "foo"));
198
199         // The new commitIndex is 101
200         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
201
202         follower = createBehavior(context);
203         follower.handleMessage(leaderActor, appendEntries);
204
205         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
206         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
207
208         assertFalse(syncStatus.isInitialSyncDone());
209         assertTrue("append entries reply should be true", reply.isSuccess());
210     }
211
212     @Test
213     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
214         logStart("testHandleFirstAppendEntries");
215
216         MockRaftActorContext context = createActorContext();
217         context.getReplicatedLog().clear(0,2);
218         context.getReplicatedLog().setSnapshotIndex(100);
219
220         List<ReplicatedLogEntry> entries = Arrays.asList(
221                 newReplicatedLogEntry(2, 101, "foo"));
222
223         // The new commitIndex is 101
224         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
225
226         follower = createBehavior(context);
227         follower.handleMessage(leaderActor, appendEntries);
228
229         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
230         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
231
232         assertFalse(syncStatus.isInitialSyncDone());
233         assertTrue("append entries reply should be true", reply.isSuccess());
234     }
235
236     @Test
237     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
238         logStart("testHandleFirstAppendEntries");
239
240         MockRaftActorContext context = createActorContext();
241         context.getReplicatedLog().clear(0,2);
242         context.getReplicatedLog().setSnapshotIndex(100);
243
244         List<ReplicatedLogEntry> entries = Arrays.asList(
245                 newReplicatedLogEntry(2, 105, "foo"));
246
247         // The new commitIndex is 101
248         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
249
250         follower = createBehavior(context);
251         follower.handleMessage(leaderActor, appendEntries);
252
253         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
254         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
255
256         assertFalse(syncStatus.isInitialSyncDone());
257         assertFalse("append entries reply should be false", reply.isSuccess());
258     }
259
260     @Test
261     public void testHandleSyncUpAppendEntries() throws Exception {
262         logStart("testHandleSyncUpAppendEntries");
263
264         MockRaftActorContext context = createActorContext();
265
266         List<ReplicatedLogEntry> entries = Arrays.asList(
267                 newReplicatedLogEntry(2, 101, "foo"));
268
269         // The new commitIndex is 101
270         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
271
272         follower = createBehavior(context);
273         follower.handleMessage(leaderActor, appendEntries);
274
275         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
276
277         assertFalse(syncStatus.isInitialSyncDone());
278
279         // Clear all the messages
280         followerActor.underlyingActor().clear();
281
282         context.setLastApplied(101);
283         context.setCommitIndex(101);
284         setLastLogEntry(context, 1, 101,
285                 new MockRaftActorContext.MockPayload(""));
286
287         entries = Arrays.asList(
288                 newReplicatedLogEntry(2, 101, "foo"));
289
290         // The new commitIndex is 101
291         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
292         follower.handleMessage(leaderActor, appendEntries);
293
294         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
295
296         assertTrue(syncStatus.isInitialSyncDone());
297
298         followerActor.underlyingActor().clear();
299
300         // Sending the same message again should not generate another message
301
302         follower.handleMessage(leaderActor, appendEntries);
303
304         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
305
306         assertNull(syncStatus);
307
308     }
309
310     @Test
311     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
312         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
313
314         MockRaftActorContext context = createActorContext();
315
316         List<ReplicatedLogEntry> entries = Arrays.asList(
317                 newReplicatedLogEntry(2, 101, "foo"));
318
319         // The new commitIndex is 101
320         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
321
322         follower = createBehavior(context);
323         follower.handleMessage(leaderActor, appendEntries);
324
325         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
326
327         assertFalse(syncStatus.isInitialSyncDone());
328
329         // Clear all the messages
330         followerActor.underlyingActor().clear();
331
332         context.setLastApplied(100);
333         setLastLogEntry(context, 1, 100,
334                 new MockRaftActorContext.MockPayload(""));
335
336         entries = Arrays.asList(
337                 newReplicatedLogEntry(2, 101, "foo"));
338
339         // leader-2 is becoming the leader now and it says the commitIndex is 45
340         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
341         follower.handleMessage(leaderActor, appendEntries);
342
343         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
344
345         // We get a new message saying initial status is not done
346         assertFalse(syncStatus.isInitialSyncDone());
347
348     }
349
350
351     @Test
352     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
353         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
354
355         MockRaftActorContext context = createActorContext();
356
357         List<ReplicatedLogEntry> entries = Arrays.asList(
358                 newReplicatedLogEntry(2, 101, "foo"));
359
360         // The new commitIndex is 101
361         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
362
363         follower = createBehavior(context);
364         follower.handleMessage(leaderActor, appendEntries);
365
366         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
367
368         assertFalse(syncStatus.isInitialSyncDone());
369
370         // Clear all the messages
371         followerActor.underlyingActor().clear();
372
373         context.setLastApplied(101);
374         context.setCommitIndex(101);
375         setLastLogEntry(context, 1, 101,
376                 new MockRaftActorContext.MockPayload(""));
377
378         entries = Arrays.asList(
379                 newReplicatedLogEntry(2, 101, "foo"));
380
381         // The new commitIndex is 101
382         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
383         follower.handleMessage(leaderActor, appendEntries);
384
385         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
386
387         assertTrue(syncStatus.isInitialSyncDone());
388
389         // Clear all the messages
390         followerActor.underlyingActor().clear();
391
392         context.setLastApplied(100);
393         setLastLogEntry(context, 1, 100,
394                 new MockRaftActorContext.MockPayload(""));
395
396         entries = Arrays.asList(
397                 newReplicatedLogEntry(2, 101, "foo"));
398
399         // leader-2 is becoming the leader now and it says the commitIndex is 45
400         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
401         follower.handleMessage(leaderActor, appendEntries);
402
403         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
404
405         // We get a new message saying initial status is not done
406         assertFalse(syncStatus.isInitialSyncDone());
407
408     }
409
410
411     /**
412      * This test verifies that when an AppendEntries RPC is received by a RaftActor
413      * with a commitIndex that is greater than what has been applied to the
414      * state machine of the RaftActor, the RaftActor applies the state and
415      * sets it current applied state to the commitIndex of the sender.
416      *
417      * @throws Exception
418      */
419     @Test
420     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
421         logStart("testHandleAppendEntriesWithNewerCommitIndex");
422
423         MockRaftActorContext context = createActorContext();
424
425         context.setLastApplied(100);
426         setLastLogEntry(context, 1, 100,
427                 new MockRaftActorContext.MockPayload(""));
428         context.getReplicatedLog().setSnapshotIndex(99);
429
430         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
431                 newReplicatedLogEntry(2, 101, "foo"));
432
433         // The new commitIndex is 101
434         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
435
436         follower = createBehavior(context);
437         follower.handleMessage(leaderActor, appendEntries);
438
439         assertEquals("getLastApplied", 101L, context.getLastApplied());
440     }
441
442     /**
443      * This test verifies that when an AppendEntries is received a specific prevLogTerm
444      * which does not match the term that is in RaftActors log entry at prevLogIndex
445      * then the RaftActor does not change it's state and it returns a failure.
446      *
447      * @throws Exception
448      */
449     @Test
450     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
451         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
452
453         MockRaftActorContext context = createActorContext();
454
455         // First set the receivers term to lower number
456         context.getTermInformation().update(95, "test");
457
458         // AppendEntries is now sent with a bigger term
459         // this will set the receivers term to be the same as the sender's term
460         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
461
462         follower = createBehavior(context);
463
464         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
465
466         Assert.assertSame(follower, newBehavior);
467
468         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
469                 AppendEntriesReply.class);
470
471         assertEquals("isSuccess", false, reply.isSuccess());
472     }
473
474     /**
475      * This test verifies that when a new AppendEntries message is received with
476      * new entries and the logs of the sender and receiver match that the new
477      * entries get added to the log and the log is incremented by the number of
478      * entries received in appendEntries
479      *
480      * @throws Exception
481      */
482     @Test
483     public void testHandleAppendEntriesAddNewEntries() {
484         logStart("testHandleAppendEntriesAddNewEntries");
485
486         MockRaftActorContext context = createActorContext();
487
488         // First set the receivers term to lower number
489         context.getTermInformation().update(1, "test");
490
491         // Prepare the receivers log
492         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
493         log.append(newReplicatedLogEntry(1, 0, "zero"));
494         log.append(newReplicatedLogEntry(1, 1, "one"));
495         log.append(newReplicatedLogEntry(1, 2, "two"));
496
497         context.setReplicatedLog(log);
498
499         // Prepare the entries to be sent with AppendEntries
500         List<ReplicatedLogEntry> entries = new ArrayList<>();
501         entries.add(newReplicatedLogEntry(1, 3, "three"));
502         entries.add(newReplicatedLogEntry(1, 4, "four"));
503
504         // Send appendEntries with the same term as was set on the receiver
505         // before the new behavior was created (1 in this case)
506         // This will not work for a Candidate because as soon as a Candidate
507         // is created it increments the term
508         short leaderPayloadVersion = 10;
509         String leaderId = "leader-1";
510         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
511
512         follower = createBehavior(context);
513
514         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
515
516         Assert.assertSame(follower, newBehavior);
517
518         assertEquals("Next index", 5, log.last().getIndex() + 1);
519         assertEquals("Entry 3", entries.get(0), log.get(3));
520         assertEquals("Entry 4", entries.get(1), log.get(4));
521
522         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
523         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
524
525         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
526     }
527
528     /**
529      * This test verifies that when a new AppendEntries message is received with
530      * new entries and the logs of the sender and receiver are out-of-sync that
531      * the log is first corrected by removing the out of sync entries from the
532      * log and then adding in the new entries sent with the AppendEntries message
533      */
534     @Test
535     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
536         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
537
538         MockRaftActorContext context = createActorContext();
539
540         // First set the receivers term to lower number
541         context.getTermInformation().update(1, "test");
542
543         // Prepare the receivers log
544         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
545         log.append(newReplicatedLogEntry(1, 0, "zero"));
546         log.append(newReplicatedLogEntry(1, 1, "one"));
547         log.append(newReplicatedLogEntry(1, 2, "two"));
548
549         context.setReplicatedLog(log);
550
551         // Prepare the entries to be sent with AppendEntries
552         List<ReplicatedLogEntry> entries = new ArrayList<>();
553         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
554         entries.add(newReplicatedLogEntry(2, 3, "three"));
555
556         // Send appendEntries with the same term as was set on the receiver
557         // before the new behavior was created (1 in this case)
558         // This will not work for a Candidate because as soon as a Candidate
559         // is created it increments the term
560         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
561
562         follower = createBehavior(context);
563
564         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
565
566         Assert.assertSame(follower, newBehavior);
567
568         // The entry at index 2 will be found out-of-sync with the leader
569         // and will be removed
570         // Then the two new entries will be added to the log
571         // Thus making the log to have 4 entries
572         assertEquals("Next index", 4, log.last().getIndex() + 1);
573         //assertEquals("Entry 2", entries.get(0), log.get(2));
574
575         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
576
577         // Check that the entry at index 2 has the new data
578         assertEquals("Entry 2", entries.get(0), log.get(2));
579
580         assertEquals("Entry 3", entries.get(1), log.get(3));
581
582         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
583     }
584
585     @Test
586     public void testHandleAppendEntriesPreviousLogEntryMissing(){
587         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
588
589         MockRaftActorContext context = createActorContext();
590
591         // Prepare the receivers log
592         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
593         log.append(newReplicatedLogEntry(1, 0, "zero"));
594         log.append(newReplicatedLogEntry(1, 1, "one"));
595         log.append(newReplicatedLogEntry(1, 2, "two"));
596
597         context.setReplicatedLog(log);
598
599         // Prepare the entries to be sent with AppendEntries
600         List<ReplicatedLogEntry> entries = new ArrayList<>();
601         entries.add(newReplicatedLogEntry(1, 4, "four"));
602
603         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
604
605         follower = createBehavior(context);
606
607         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
608
609         Assert.assertSame(follower, newBehavior);
610
611         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
612     }
613
614     @Test
615     public void testHandleAppendEntriesWithExistingLogEntry() {
616         logStart("testHandleAppendEntriesWithExistingLogEntry");
617
618         MockRaftActorContext context = createActorContext();
619
620         context.getTermInformation().update(1, "test");
621
622         // Prepare the receivers log
623         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
624         log.append(newReplicatedLogEntry(1, 0, "zero"));
625         log.append(newReplicatedLogEntry(1, 1, "one"));
626
627         context.setReplicatedLog(log);
628
629         // Send the last entry again.
630         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
631
632         follower = createBehavior(context);
633
634         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
635
636         assertEquals("Next index", 2, log.last().getIndex() + 1);
637         assertEquals("Entry 1", entries.get(0), log.get(1));
638
639         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
640
641         // Send the last entry again and also a new one.
642
643         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
644
645         leaderActor.underlyingActor().clear();
646         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
647
648         assertEquals("Next index", 3, log.last().getIndex() + 1);
649         assertEquals("Entry 1", entries.get(0), log.get(1));
650         assertEquals("Entry 2", entries.get(1), log.get(2));
651
652         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
653     }
654
655     @Test
656     public void testHandleAppendEntriesAfterInstallingSnapshot(){
657         logStart("testHandleAppendAfterInstallingSnapshot");
658
659         MockRaftActorContext context = createActorContext();
660
661         // Prepare the receivers log
662         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
663
664         // Set up a log as if it has been snapshotted
665         log.setSnapshotIndex(3);
666         log.setSnapshotTerm(1);
667
668         context.setReplicatedLog(log);
669
670         // Prepare the entries to be sent with AppendEntries
671         List<ReplicatedLogEntry> entries = new ArrayList<>();
672         entries.add(newReplicatedLogEntry(1, 4, "four"));
673
674         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
675
676         follower = createBehavior(context);
677
678         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
679
680         Assert.assertSame(follower, newBehavior);
681
682         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
683     }
684
685
686     /**
687      * This test verifies that when InstallSnapshot is received by
688      * the follower its applied correctly.
689      *
690      * @throws Exception
691      */
692     @Test
693     public void testHandleInstallSnapshot() throws Exception {
694         logStart("testHandleInstallSnapshot");
695
696         MockRaftActorContext context = createActorContext();
697
698         follower = createBehavior(context);
699
700         ByteString bsSnapshot  = createSnapshot();
701         int offset = 0;
702         int snapshotLength = bsSnapshot.size();
703         int chunkSize = 50;
704         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
705         int lastIncludedIndex = 1;
706         int chunkIndex = 1;
707         InstallSnapshot lastInstallSnapshot = null;
708
709         for(int i = 0; i < totalChunks; i++) {
710             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
711             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
712                     chunkData, chunkIndex, totalChunks);
713             follower.handleMessage(leaderActor, lastInstallSnapshot);
714             offset = offset + 50;
715             lastIncludedIndex++;
716             chunkIndex++;
717         }
718
719         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
720                 ApplySnapshot.class);
721         Snapshot snapshot = applySnapshot.getSnapshot();
722         assertNotNull(lastInstallSnapshot);
723         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
724         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
725                 snapshot.getLastAppliedTerm());
726         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
727                 snapshot.getLastAppliedIndex());
728         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
729         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
730
731         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
732                 leaderActor, InstallSnapshotReply.class);
733         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
734
735         chunkIndex = 1;
736         for(InstallSnapshotReply reply: replies) {
737             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
738             assertEquals("getTerm", 1, reply.getTerm());
739             assertEquals("isSuccess", true, reply.isSuccess());
740             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
741         }
742
743         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
744     }
745
746
747     /**
748      * Verify that when an AppendEntries is sent to a follower during a snapshot install
749      * the Follower short-circuits the processing of the AppendEntries message.
750      *
751      * @throws Exception
752      */
753     @Test
754     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
755         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
756
757         MockRaftActorContext context = createActorContext();
758
759         follower = createBehavior(context);
760
761         ByteString bsSnapshot  = createSnapshot();
762         int snapshotLength = bsSnapshot.size();
763         int chunkSize = 50;
764         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
765         int lastIncludedIndex = 1;
766
767         // Check that snapshot installation is not in progress
768         assertNull(((Follower) follower).getSnapshotTracker());
769
770         // Make sure that we have more than 1 chunk to send
771         assertTrue(totalChunks > 1);
772
773         // Send an install snapshot with the first chunk to start the process of installing a snapshot
774         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
775         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
776                 chunkData, 1, totalChunks));
777
778         // Check if snapshot installation is in progress now
779         assertNotNull(((Follower) follower).getSnapshotTracker());
780
781         // Send an append entry
782         AppendEntries appendEntries = mock(AppendEntries.class);
783         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
784
785         follower.handleMessage(leaderActor, appendEntries);
786
787         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
788         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
789         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
790         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
791
792         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
793         verify(appendEntries, never()).getPrevLogIndex();
794
795     }
796
797     @Test
798     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
799         logStart("testInitialSyncUpWithHandleInstallSnapshot");
800
801         MockRaftActorContext context = createActorContext();
802
803         follower = createBehavior(context);
804
805         ByteString bsSnapshot  = createSnapshot();
806         int offset = 0;
807         int snapshotLength = bsSnapshot.size();
808         int chunkSize = 50;
809         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
810         int lastIncludedIndex = 1;
811         int chunkIndex = 1;
812         InstallSnapshot lastInstallSnapshot = null;
813
814         for(int i = 0; i < totalChunks; i++) {
815             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
816             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
817                     chunkData, chunkIndex, totalChunks);
818             follower.handleMessage(leaderActor, lastInstallSnapshot);
819             offset = offset + 50;
820             lastIncludedIndex++;
821             chunkIndex++;
822         }
823
824         FollowerInitialSyncUpStatus syncStatus =
825                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
826
827         assertFalse(syncStatus.isInitialSyncDone());
828
829         // Clear all the messages
830         followerActor.underlyingActor().clear();
831
832         context.setLastApplied(101);
833         context.setCommitIndex(101);
834         setLastLogEntry(context, 1, 101,
835                 new MockRaftActorContext.MockPayload(""));
836
837         List<ReplicatedLogEntry> entries = Arrays.asList(
838                 newReplicatedLogEntry(2, 101, "foo"));
839
840         // The new commitIndex is 101
841         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
842         follower.handleMessage(leaderActor, appendEntries);
843
844         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
845
846         assertTrue(syncStatus.isInitialSyncDone());
847     }
848
849     @Test
850     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
851         logStart("testHandleOutOfSequenceInstallSnapshot");
852
853         MockRaftActorContext context = createActorContext();
854
855         follower = createBehavior(context);
856
857         ByteString bsSnapshot = createSnapshot();
858
859         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
860                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
861         follower.handleMessage(leaderActor, installSnapshot);
862
863         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
864                 InstallSnapshotReply.class);
865
866         assertEquals("isSuccess", false, reply.isSuccess());
867         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
868         assertEquals("getTerm", 1, reply.getTerm());
869         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
870
871         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
872     }
873
874     @Test
875     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
876         MockRaftActorContext context = createActorContext();
877
878         Stopwatch stopwatch = Stopwatch.createStarted();
879
880         follower = createBehavior(context);
881
882         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
883
884         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
885
886         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
887     }
888
889     @Test
890     public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
891         MockRaftActorContext context = createActorContext();
892         context.setConfigParams(new DefaultConfigParamsImpl(){
893             @Override
894             public FiniteDuration getElectionTimeOutInterval() {
895                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
896             }
897         });
898
899         context.setRaftPolicy(createRaftPolicy(false, false));
900
901         follower = createBehavior(context);
902
903         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
904     }
905
906
907     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
908         int snapshotLength = bs.size();
909         int start = offset;
910         int size = chunkSize;
911         if (chunkSize > snapshotLength) {
912             size = snapshotLength;
913         } else {
914             if ((start + chunkSize) > snapshotLength) {
915                 size = snapshotLength - start;
916             }
917         }
918         return bs.substring(start, start + size);
919     }
920
921     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
922             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
923
924         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
925                 AppendEntriesReply.class);
926
927         assertEquals("isSuccess", expSuccess, reply.isSuccess());
928         assertEquals("getTerm", expTerm, reply.getTerm());
929         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
930         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
931         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
932         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
933     }
934
935     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
936         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
937                 new MockRaftActorContext.MockPayload(data));
938     }
939
940     private ByteString createSnapshot(){
941         HashMap<String, String> followerSnapshot = new HashMap<>();
942         followerSnapshot.put("1", "A");
943         followerSnapshot.put("2", "B");
944         followerSnapshot.put("3", "C");
945
946         return toByteString(followerSnapshot);
947     }
948
949     @Override
950     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
951             ActorRef actorRef, RaftRPC rpc) throws Exception {
952         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
953
954         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
955         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
956     }
957
958     @Override
959     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
960             throws Exception {
961         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
962         assertEquals("isSuccess", true, reply.isSuccess());
963     }
964 }