Use partial mocking for FollowerTest
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.Props;
24 import akka.testkit.TestActorRef;
25 import com.google.common.base.Stopwatch;
26 import com.google.protobuf.ByteString;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
36 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
37 import org.opendaylight.controller.cluster.raft.RaftActorContext;
38 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
41 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
42 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
43 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
45 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
46 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
47 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
48 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
49 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
50 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
51 import scala.concurrent.duration.FiniteDuration;
52
53 public class FollowerTest extends AbstractRaftActorBehaviorTest {
54
55     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
56             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
57
58     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
59             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
60
61     private Follower follower;
62
63     private final short payloadVersion = 5;
64
65     @Override
66     @After
67     public void tearDown() throws Exception {
68         if(follower != null) {
69             follower.close();
70         }
71
72         super.tearDown();
73     }
74
75     @Override
76     protected Follower createBehavior(RaftActorContext actorContext) {
77         return spy(new Follower(actorContext));
78     }
79
80     @Override
81     protected  MockRaftActorContext createActorContext() {
82         return createActorContext(followerActor);
83     }
84
85     @Override
86     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
87         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
88         context.setPayloadVersion(payloadVersion );
89         return context;
90     }
91
92     @Test
93     public void testThatAnElectionTimeoutIsTriggered(){
94         MockRaftActorContext actorContext = createActorContext();
95         follower = new Follower(actorContext);
96
97         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
98                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
99     }
100
101     @Test
102     public void testHandleElectionTimeout(){
103         logStart("testHandleElectionTimeout");
104
105         follower = new Follower(createActorContext());
106
107         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, ElectionTimeout.INSTANCE);
108
109         assertTrue(raftBehavior instanceof Candidate);
110     }
111
112     @Test
113     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
114         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
115
116         RaftActorContext context = createActorContext();
117         long term = 1000;
118         context.getTermInformation().update(term, null);
119
120         follower = createBehavior(context);
121
122         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
123
124         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
125
126         assertEquals("isVoteGranted", true, reply.isVoteGranted());
127         assertEquals("getTerm", term, reply.getTerm());
128         verify(follower).scheduleElection(any(FiniteDuration.class));
129     }
130
131     @Test
132     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
133         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
134
135         RaftActorContext context = createActorContext();
136         long term = 1000;
137         context.getTermInformation().update(term, "test");
138
139         follower = createBehavior(context);
140
141         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
142
143         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
144
145         assertEquals("isVoteGranted", false, reply.isVoteGranted());
146         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
147     }
148
149
150     @Test
151     public void testHandleFirstAppendEntries() throws Exception {
152         logStart("testHandleFirstAppendEntries");
153
154         MockRaftActorContext context = createActorContext();
155         context.getReplicatedLog().clear(0,2);
156         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
157         context.getReplicatedLog().setSnapshotIndex(99);
158
159         List<ReplicatedLogEntry> entries = Arrays.asList(
160                 newReplicatedLogEntry(2, 101, "foo"));
161
162         Assert.assertEquals(1, context.getReplicatedLog().size());
163
164         // The new commitIndex is 101
165         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
166
167         follower = createBehavior(context);
168         follower.handleMessage(leaderActor, appendEntries);
169
170         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
171         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
172
173         assertFalse(syncStatus.isInitialSyncDone());
174         assertTrue("append entries reply should be true", reply.isSuccess());
175     }
176
177     @Test
178     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
179         logStart("testHandleFirstAppendEntries");
180
181         MockRaftActorContext context = createActorContext();
182
183         List<ReplicatedLogEntry> entries = Arrays.asList(
184                 newReplicatedLogEntry(2, 101, "foo"));
185
186         // The new commitIndex is 101
187         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
188
189         follower = createBehavior(context);
190         follower.handleMessage(leaderActor, appendEntries);
191
192         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
193         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
194
195         assertFalse(syncStatus.isInitialSyncDone());
196         assertFalse("append entries reply should be false", reply.isSuccess());
197     }
198
199     @Test
200     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
201         logStart("testHandleFirstAppendEntries");
202
203         MockRaftActorContext context = createActorContext();
204         context.getReplicatedLog().clear(0,2);
205         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
206         context.getReplicatedLog().setSnapshotIndex(99);
207
208         List<ReplicatedLogEntry> entries = Arrays.asList(
209                 newReplicatedLogEntry(2, 101, "foo"));
210
211         // The new commitIndex is 101
212         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
213
214         follower = createBehavior(context);
215         follower.handleMessage(leaderActor, appendEntries);
216
217         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
218         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
219
220         assertFalse(syncStatus.isInitialSyncDone());
221         assertTrue("append entries reply should be true", reply.isSuccess());
222     }
223
224     @Test
225     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
226         logStart("testHandleFirstAppendEntries");
227
228         MockRaftActorContext context = createActorContext();
229         context.getReplicatedLog().clear(0,2);
230         context.getReplicatedLog().setSnapshotIndex(100);
231
232         List<ReplicatedLogEntry> entries = Arrays.asList(
233                 newReplicatedLogEntry(2, 101, "foo"));
234
235         // The new commitIndex is 101
236         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
237
238         follower = createBehavior(context);
239         follower.handleMessage(leaderActor, appendEntries);
240
241         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
242         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
243
244         assertFalse(syncStatus.isInitialSyncDone());
245         assertTrue("append entries reply should be true", reply.isSuccess());
246     }
247
248     @Test
249     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
250         logStart("testHandleFirstAppendEntries");
251
252         MockRaftActorContext context = createActorContext();
253         context.getReplicatedLog().clear(0,2);
254         context.getReplicatedLog().setSnapshotIndex(100);
255
256         List<ReplicatedLogEntry> entries = Arrays.asList(
257                 newReplicatedLogEntry(2, 105, "foo"));
258
259         // The new commitIndex is 101
260         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
261
262         follower = createBehavior(context);
263         follower.handleMessage(leaderActor, appendEntries);
264
265         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
266         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
267
268         assertFalse(syncStatus.isInitialSyncDone());
269         assertFalse("append entries reply should be false", reply.isSuccess());
270     }
271
272     @Test
273     public void testHandleSyncUpAppendEntries() throws Exception {
274         logStart("testHandleSyncUpAppendEntries");
275
276         MockRaftActorContext context = createActorContext();
277
278         List<ReplicatedLogEntry> entries = Arrays.asList(
279                 newReplicatedLogEntry(2, 101, "foo"));
280
281         // The new commitIndex is 101
282         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
283
284         follower = createBehavior(context);
285         follower.handleMessage(leaderActor, appendEntries);
286
287         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
288
289         assertFalse(syncStatus.isInitialSyncDone());
290
291         // Clear all the messages
292         followerActor.underlyingActor().clear();
293
294         context.setLastApplied(101);
295         context.setCommitIndex(101);
296         setLastLogEntry(context, 1, 101,
297                 new MockRaftActorContext.MockPayload(""));
298
299         entries = Arrays.asList(
300                 newReplicatedLogEntry(2, 101, "foo"));
301
302         // The new commitIndex is 101
303         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
304         follower.handleMessage(leaderActor, appendEntries);
305
306         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
307
308         assertTrue(syncStatus.isInitialSyncDone());
309
310         followerActor.underlyingActor().clear();
311
312         // Sending the same message again should not generate another message
313
314         follower.handleMessage(leaderActor, appendEntries);
315
316         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
317
318         assertNull(syncStatus);
319
320     }
321
322     @Test
323     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
324         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
325
326         MockRaftActorContext context = createActorContext();
327
328         List<ReplicatedLogEntry> entries = Arrays.asList(
329                 newReplicatedLogEntry(2, 101, "foo"));
330
331         // The new commitIndex is 101
332         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
333
334         follower = createBehavior(context);
335         follower.handleMessage(leaderActor, appendEntries);
336
337         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
338
339         assertFalse(syncStatus.isInitialSyncDone());
340
341         // Clear all the messages
342         followerActor.underlyingActor().clear();
343
344         context.setLastApplied(100);
345         setLastLogEntry(context, 1, 100,
346                 new MockRaftActorContext.MockPayload(""));
347
348         entries = Arrays.asList(
349                 newReplicatedLogEntry(2, 101, "foo"));
350
351         // leader-2 is becoming the leader now and it says the commitIndex is 45
352         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
353         follower.handleMessage(leaderActor, appendEntries);
354
355         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
356
357         // We get a new message saying initial status is not done
358         assertFalse(syncStatus.isInitialSyncDone());
359
360     }
361
362
363     @Test
364     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
365         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
366
367         MockRaftActorContext context = createActorContext();
368
369         List<ReplicatedLogEntry> entries = Arrays.asList(
370                 newReplicatedLogEntry(2, 101, "foo"));
371
372         // The new commitIndex is 101
373         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
374
375         follower = createBehavior(context);
376         follower.handleMessage(leaderActor, appendEntries);
377
378         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
379
380         assertFalse(syncStatus.isInitialSyncDone());
381
382         // Clear all the messages
383         followerActor.underlyingActor().clear();
384
385         context.setLastApplied(101);
386         context.setCommitIndex(101);
387         setLastLogEntry(context, 1, 101,
388                 new MockRaftActorContext.MockPayload(""));
389
390         entries = Arrays.asList(
391                 newReplicatedLogEntry(2, 101, "foo"));
392
393         // The new commitIndex is 101
394         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
395         follower.handleMessage(leaderActor, appendEntries);
396
397         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
398
399         assertTrue(syncStatus.isInitialSyncDone());
400
401         // Clear all the messages
402         followerActor.underlyingActor().clear();
403
404         context.setLastApplied(100);
405         setLastLogEntry(context, 1, 100,
406                 new MockRaftActorContext.MockPayload(""));
407
408         entries = Arrays.asList(
409                 newReplicatedLogEntry(2, 101, "foo"));
410
411         // leader-2 is becoming the leader now and it says the commitIndex is 45
412         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
413         follower.handleMessage(leaderActor, appendEntries);
414
415         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
416
417         // We get a new message saying initial status is not done
418         assertFalse(syncStatus.isInitialSyncDone());
419
420     }
421
422
423     /**
424      * This test verifies that when an AppendEntries RPC is received by a RaftActor
425      * with a commitIndex that is greater than what has been applied to the
426      * state machine of the RaftActor, the RaftActor applies the state and
427      * sets it current applied state to the commitIndex of the sender.
428      *
429      * @throws Exception
430      */
431     @Test
432     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
433         logStart("testHandleAppendEntriesWithNewerCommitIndex");
434
435         MockRaftActorContext context = createActorContext();
436
437         context.setLastApplied(100);
438         setLastLogEntry(context, 1, 100,
439                 new MockRaftActorContext.MockPayload(""));
440         context.getReplicatedLog().setSnapshotIndex(99);
441
442         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
443                 newReplicatedLogEntry(2, 101, "foo"));
444
445         // The new commitIndex is 101
446         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
447
448         follower = createBehavior(context);
449         follower.handleMessage(leaderActor, appendEntries);
450
451         assertEquals("getLastApplied", 101L, context.getLastApplied());
452     }
453
454     /**
455      * This test verifies that when an AppendEntries is received a specific prevLogTerm
456      * which does not match the term that is in RaftActors log entry at prevLogIndex
457      * then the RaftActor does not change it's state and it returns a failure.
458      *
459      * @throws Exception
460      */
461     @Test
462     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
463         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
464
465         MockRaftActorContext context = createActorContext();
466
467         // First set the receivers term to lower number
468         context.getTermInformation().update(95, "test");
469
470         // AppendEntries is now sent with a bigger term
471         // this will set the receivers term to be the same as the sender's term
472         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
473
474         follower = createBehavior(context);
475
476         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
477
478         Assert.assertSame(follower, newBehavior);
479
480         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
481                 AppendEntriesReply.class);
482
483         assertEquals("isSuccess", false, reply.isSuccess());
484     }
485
486     /**
487      * This test verifies that when a new AppendEntries message is received with
488      * new entries and the logs of the sender and receiver match that the new
489      * entries get added to the log and the log is incremented by the number of
490      * entries received in appendEntries
491      *
492      * @throws Exception
493      */
494     @Test
495     public void testHandleAppendEntriesAddNewEntries() {
496         logStart("testHandleAppendEntriesAddNewEntries");
497
498         MockRaftActorContext context = createActorContext();
499
500         // First set the receivers term to lower number
501         context.getTermInformation().update(1, "test");
502
503         // Prepare the receivers log
504         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
505         log.append(newReplicatedLogEntry(1, 0, "zero"));
506         log.append(newReplicatedLogEntry(1, 1, "one"));
507         log.append(newReplicatedLogEntry(1, 2, "two"));
508
509         context.setReplicatedLog(log);
510
511         // Prepare the entries to be sent with AppendEntries
512         List<ReplicatedLogEntry> entries = new ArrayList<>();
513         entries.add(newReplicatedLogEntry(1, 3, "three"));
514         entries.add(newReplicatedLogEntry(1, 4, "four"));
515
516         // Send appendEntries with the same term as was set on the receiver
517         // before the new behavior was created (1 in this case)
518         // This will not work for a Candidate because as soon as a Candidate
519         // is created it increments the term
520         short leaderPayloadVersion = 10;
521         String leaderId = "leader-1";
522         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
523
524         follower = createBehavior(context);
525
526         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
527
528         Assert.assertSame(follower, newBehavior);
529
530         assertEquals("Next index", 5, log.last().getIndex() + 1);
531         assertEquals("Entry 3", entries.get(0), log.get(3));
532         assertEquals("Entry 4", entries.get(1), log.get(4));
533
534         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
535         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
536
537         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
538     }
539
540     /**
541      * This test verifies that when a new AppendEntries message is received with
542      * new entries and the logs of the sender and receiver are out-of-sync that
543      * the log is first corrected by removing the out of sync entries from the
544      * log and then adding in the new entries sent with the AppendEntries message
545      */
546     @Test
547     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
548         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
549
550         MockRaftActorContext context = createActorContext();
551
552         // First set the receivers term to lower number
553         context.getTermInformation().update(1, "test");
554
555         // Prepare the receivers log
556         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
557         log.append(newReplicatedLogEntry(1, 0, "zero"));
558         log.append(newReplicatedLogEntry(1, 1, "one"));
559         log.append(newReplicatedLogEntry(1, 2, "two"));
560
561         context.setReplicatedLog(log);
562
563         // Prepare the entries to be sent with AppendEntries
564         List<ReplicatedLogEntry> entries = new ArrayList<>();
565         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
566         entries.add(newReplicatedLogEntry(2, 3, "three"));
567
568         // Send appendEntries with the same term as was set on the receiver
569         // before the new behavior was created (1 in this case)
570         // This will not work for a Candidate because as soon as a Candidate
571         // is created it increments the term
572         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
573
574         follower = createBehavior(context);
575
576         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
577
578         Assert.assertSame(follower, newBehavior);
579
580         // The entry at index 2 will be found out-of-sync with the leader
581         // and will be removed
582         // Then the two new entries will be added to the log
583         // Thus making the log to have 4 entries
584         assertEquals("Next index", 4, log.last().getIndex() + 1);
585         //assertEquals("Entry 2", entries.get(0), log.get(2));
586
587         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
588
589         // Check that the entry at index 2 has the new data
590         assertEquals("Entry 2", entries.get(0), log.get(2));
591
592         assertEquals("Entry 3", entries.get(1), log.get(3));
593
594         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
595     }
596
597     @Test
598     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
599         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
600
601         MockRaftActorContext context = createActorContext();
602
603         // First set the receivers term to lower number
604         context.getTermInformation().update(1, "test");
605
606         // Prepare the receivers log
607         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
608         log.append(newReplicatedLogEntry(1, 0, "zero"));
609         log.append(newReplicatedLogEntry(1, 1, "one"));
610         log.append(newReplicatedLogEntry(1, 2, "two"));
611
612         context.setReplicatedLog(log);
613
614         // Prepare the entries to be sent with AppendEntries
615         List<ReplicatedLogEntry> entries = new ArrayList<>();
616         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
617         entries.add(newReplicatedLogEntry(2, 3, "three"));
618
619         // Send appendEntries with the same term as was set on the receiver
620         // before the new behavior was created (1 in this case)
621         // This will not work for a Candidate because as soon as a Candidate
622         // is created it increments the term
623         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
624
625         context.setRaftPolicy(createRaftPolicy(false, true));
626         follower = createBehavior(context);
627
628         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
629
630         Assert.assertSame(follower, newBehavior);
631
632         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
633     }
634
635     @Test
636     public void testHandleAppendEntriesPreviousLogEntryMissing(){
637         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
638
639         MockRaftActorContext context = createActorContext();
640
641         // Prepare the receivers log
642         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
643         log.append(newReplicatedLogEntry(1, 0, "zero"));
644         log.append(newReplicatedLogEntry(1, 1, "one"));
645         log.append(newReplicatedLogEntry(1, 2, "two"));
646
647         context.setReplicatedLog(log);
648
649         // Prepare the entries to be sent with AppendEntries
650         List<ReplicatedLogEntry> entries = new ArrayList<>();
651         entries.add(newReplicatedLogEntry(1, 4, "four"));
652
653         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
654
655         follower = createBehavior(context);
656
657         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
658
659         Assert.assertSame(follower, newBehavior);
660
661         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
662     }
663
664     @Test
665     public void testHandleAppendEntriesWithExistingLogEntry() {
666         logStart("testHandleAppendEntriesWithExistingLogEntry");
667
668         MockRaftActorContext context = createActorContext();
669
670         context.getTermInformation().update(1, "test");
671
672         // Prepare the receivers log
673         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
674         log.append(newReplicatedLogEntry(1, 0, "zero"));
675         log.append(newReplicatedLogEntry(1, 1, "one"));
676
677         context.setReplicatedLog(log);
678
679         // Send the last entry again.
680         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
681
682         follower = createBehavior(context);
683
684         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
685
686         assertEquals("Next index", 2, log.last().getIndex() + 1);
687         assertEquals("Entry 1", entries.get(0), log.get(1));
688
689         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
690
691         // Send the last entry again and also a new one.
692
693         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
694
695         leaderActor.underlyingActor().clear();
696         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
697
698         assertEquals("Next index", 3, log.last().getIndex() + 1);
699         assertEquals("Entry 1", entries.get(0), log.get(1));
700         assertEquals("Entry 2", entries.get(1), log.get(2));
701
702         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
703     }
704
705     @Test
706     public void testHandleAppendEntriesAfterInstallingSnapshot(){
707         logStart("testHandleAppendAfterInstallingSnapshot");
708
709         MockRaftActorContext context = createActorContext();
710
711         // Prepare the receivers log
712         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
713
714         // Set up a log as if it has been snapshotted
715         log.setSnapshotIndex(3);
716         log.setSnapshotTerm(1);
717
718         context.setReplicatedLog(log);
719
720         // Prepare the entries to be sent with AppendEntries
721         List<ReplicatedLogEntry> entries = new ArrayList<>();
722         entries.add(newReplicatedLogEntry(1, 4, "four"));
723
724         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
725
726         follower = createBehavior(context);
727
728         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
729
730         Assert.assertSame(follower, newBehavior);
731
732         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
733     }
734
735
736     /**
737      * This test verifies that when InstallSnapshot is received by
738      * the follower its applied correctly.
739      *
740      * @throws Exception
741      */
742     @Test
743     public void testHandleInstallSnapshot() throws Exception {
744         logStart("testHandleInstallSnapshot");
745
746         MockRaftActorContext context = createActorContext();
747         context.getTermInformation().update(1, "leader");
748
749         follower = createBehavior(context);
750
751         ByteString bsSnapshot  = createSnapshot();
752         int offset = 0;
753         int snapshotLength = bsSnapshot.size();
754         int chunkSize = 50;
755         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
756         int lastIncludedIndex = 1;
757         int chunkIndex = 1;
758         InstallSnapshot lastInstallSnapshot = null;
759
760         for(int i = 0; i < totalChunks; i++) {
761             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
762             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
763                     chunkData, chunkIndex, totalChunks);
764             follower.handleMessage(leaderActor, lastInstallSnapshot);
765             offset = offset + 50;
766             lastIncludedIndex++;
767             chunkIndex++;
768         }
769
770         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
771                 ApplySnapshot.class);
772         Snapshot snapshot = applySnapshot.getSnapshot();
773         assertNotNull(lastInstallSnapshot);
774         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
775         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
776                 snapshot.getLastAppliedTerm());
777         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
778                 snapshot.getLastAppliedIndex());
779         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
780         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
781         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
782         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
783         applySnapshot.getCallback().onSuccess();
784
785         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
786                 leaderActor, InstallSnapshotReply.class);
787         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
788
789         chunkIndex = 1;
790         for(InstallSnapshotReply reply: replies) {
791             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
792             assertEquals("getTerm", 1, reply.getTerm());
793             assertEquals("isSuccess", true, reply.isSuccess());
794             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
795         }
796
797         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
798     }
799
800
801     /**
802      * Verify that when an AppendEntries is sent to a follower during a snapshot install
803      * the Follower short-circuits the processing of the AppendEntries message.
804      *
805      * @throws Exception
806      */
807     @Test
808     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
809         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
810
811         MockRaftActorContext context = createActorContext();
812
813         follower = createBehavior(context);
814
815         ByteString bsSnapshot  = createSnapshot();
816         int snapshotLength = bsSnapshot.size();
817         int chunkSize = 50;
818         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
819         int lastIncludedIndex = 1;
820
821         // Check that snapshot installation is not in progress
822         assertNull(follower.getSnapshotTracker());
823
824         // Make sure that we have more than 1 chunk to send
825         assertTrue(totalChunks > 1);
826
827         // Send an install snapshot with the first chunk to start the process of installing a snapshot
828         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
829         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
830                 chunkData, 1, totalChunks));
831
832         // Check if snapshot installation is in progress now
833         assertNotNull(follower.getSnapshotTracker());
834
835         // Send an append entry
836         AppendEntries appendEntries = mock(AppendEntries.class);
837         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
838
839         follower.handleMessage(leaderActor, appendEntries);
840
841         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
842         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
843         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
844         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
845
846         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
847         verify(appendEntries, never()).getPrevLogIndex();
848
849     }
850
851     @Test
852     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
853         logStart("testInitialSyncUpWithHandleInstallSnapshot");
854
855         MockRaftActorContext context = createActorContext();
856
857         follower = createBehavior(context);
858
859         ByteString bsSnapshot  = createSnapshot();
860         int offset = 0;
861         int snapshotLength = bsSnapshot.size();
862         int chunkSize = 50;
863         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
864         int lastIncludedIndex = 1;
865         int chunkIndex = 1;
866         InstallSnapshot lastInstallSnapshot = null;
867
868         for(int i = 0; i < totalChunks; i++) {
869             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
870             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
871                     chunkData, chunkIndex, totalChunks);
872             follower.handleMessage(leaderActor, lastInstallSnapshot);
873             offset = offset + 50;
874             lastIncludedIndex++;
875             chunkIndex++;
876         }
877
878         FollowerInitialSyncUpStatus syncStatus =
879                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
880
881         assertFalse(syncStatus.isInitialSyncDone());
882
883         // Clear all the messages
884         followerActor.underlyingActor().clear();
885
886         context.setLastApplied(101);
887         context.setCommitIndex(101);
888         setLastLogEntry(context, 1, 101,
889                 new MockRaftActorContext.MockPayload(""));
890
891         List<ReplicatedLogEntry> entries = Arrays.asList(
892                 newReplicatedLogEntry(2, 101, "foo"));
893
894         // The new commitIndex is 101
895         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
896         follower.handleMessage(leaderActor, appendEntries);
897
898         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
899
900         assertTrue(syncStatus.isInitialSyncDone());
901     }
902
903     @Test
904     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
905         logStart("testHandleOutOfSequenceInstallSnapshot");
906
907         MockRaftActorContext context = createActorContext();
908
909         follower = createBehavior(context);
910
911         ByteString bsSnapshot = createSnapshot();
912
913         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
914                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
915         follower.handleMessage(leaderActor, installSnapshot);
916
917         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
918                 InstallSnapshotReply.class);
919
920         assertEquals("isSuccess", false, reply.isSuccess());
921         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
922         assertEquals("getTerm", 1, reply.getTerm());
923         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
924
925         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
926     }
927
928     @Test
929     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
930         MockRaftActorContext context = createActorContext();
931
932         Stopwatch stopwatch = Stopwatch.createStarted();
933
934         follower = createBehavior(context);
935
936         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
937
938         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
939
940         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
941     }
942
943     @Test
944     public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
945         MockRaftActorContext context = createActorContext();
946         context.setConfigParams(new DefaultConfigParamsImpl(){
947             @Override
948             public FiniteDuration getElectionTimeOutInterval() {
949                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
950             }
951         });
952
953         context.setRaftPolicy(createRaftPolicy(false, false));
954
955         follower = createBehavior(context);
956
957         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
958     }
959
960     @Test
961     public void testElectionScheduledWhenAnyRaftRPCReceived(){
962         MockRaftActorContext context = createActorContext();
963         follower = createBehavior(context);
964         follower.handleMessage(leaderActor, new RaftRPC() {
965             private static final long serialVersionUID = 1L;
966
967             @Override
968             public long getTerm() {
969                 return 100;
970             }
971         });
972         verify(follower).scheduleElection(any(FiniteDuration.class));
973     }
974
975     @Test
976     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
977         MockRaftActorContext context = createActorContext();
978         follower = createBehavior(context);
979         follower.handleMessage(leaderActor, "non-raft-rpc");
980         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
981     }
982
983     public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
984         int snapshotLength = bs.size();
985         int start = offset;
986         int size = chunkSize;
987         if (chunkSize > snapshotLength) {
988             size = snapshotLength;
989         } else {
990             if ((start + chunkSize) > snapshotLength) {
991                 size = snapshotLength - start;
992             }
993         }
994
995         byte[] nextChunk = new byte[size];
996         bs.copyTo(nextChunk, start, 0, size);
997         return nextChunk;
998     }
999
1000     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1001             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1002         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1003     }
1004
1005     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1006                                                    String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1007                                                    boolean expForceInstallSnapshot) {
1008
1009         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1010                 AppendEntriesReply.class);
1011
1012         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1013         assertEquals("getTerm", expTerm, reply.getTerm());
1014         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1015         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1016         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1017         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1018         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1019     }
1020
1021
1022     private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1023         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1024                 new MockRaftActorContext.MockPayload(data));
1025     }
1026
1027     private ByteString createSnapshot(){
1028         HashMap<String, String> followerSnapshot = new HashMap<>();
1029         followerSnapshot.put("1", "A");
1030         followerSnapshot.put("2", "B");
1031         followerSnapshot.put("3", "C");
1032
1033         return toByteString(followerSnapshot);
1034     }
1035
1036     @Override
1037     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1038             ActorRef actorRef, RaftRPC rpc) throws Exception {
1039         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1040
1041         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1042         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1043     }
1044
1045     @Override
1046     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
1047             throws Exception {
1048         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1049         assertEquals("isSuccess", true, reply.isSuccess());
1050     }
1051 }