1e931109c3e9234e73cf3c94f31b1a3e7ecf3978
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.Snapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
32 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
37 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
39 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import scala.concurrent.duration.FiniteDuration;
42
43 public class FollowerTest extends AbstractRaftActorBehaviorTest {
44
45     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
46             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
47
48     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
49             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
50
51     private RaftActorBehavior follower;
52
53     private final short payloadVersion = 5;
54
55     @Override
56     @After
57     public void tearDown() throws Exception {
58         if(follower != null) {
59             follower.close();
60         }
61
62         super.tearDown();
63     }
64
65     @Override
66     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
67         return new Follower(actorContext);
68     }
69
70     @Override
71     protected  MockRaftActorContext createActorContext() {
72         return createActorContext(followerActor);
73     }
74
75     @Override
76     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
77         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
78         context.setPayloadVersion(payloadVersion );
79         return context;
80     }
81
82     @Test
83     public void testThatAnElectionTimeoutIsTriggered(){
84         MockRaftActorContext actorContext = createActorContext();
85         follower = new Follower(actorContext);
86
87         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
88                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
89     }
90
91     @Test
92     public void testHandleElectionTimeout(){
93         logStart("testHandleElectionTimeout");
94
95         follower = new Follower(createActorContext());
96
97         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
98
99         assertTrue(raftBehavior instanceof Candidate);
100     }
101
102     @Test
103     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
104         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
105
106         RaftActorContext context = createActorContext();
107         long term = 1000;
108         context.getTermInformation().update(term, null);
109
110         follower = createBehavior(context);
111
112         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
113
114         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
115
116         assertEquals("isVoteGranted", true, reply.isVoteGranted());
117         assertEquals("getTerm", term, reply.getTerm());
118     }
119
120     @Test
121     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
122         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
123
124         RaftActorContext context = createActorContext();
125         long term = 1000;
126         context.getTermInformation().update(term, "test");
127
128         follower = createBehavior(context);
129
130         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
131
132         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
133
134         assertEquals("isVoteGranted", false, reply.isVoteGranted());
135     }
136
137
138     @Test
139     public void testHandleFirstAppendEntries() throws Exception {
140         logStart("testHandleFirstAppendEntries");
141
142         MockRaftActorContext context = createActorContext();
143
144         List<ReplicatedLogEntry> entries = Arrays.asList(
145                 newReplicatedLogEntry(2, 101, "foo"));
146
147         // The new commitIndex is 101
148         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
149
150         follower = createBehavior(context);
151         follower.handleMessage(leaderActor, appendEntries);
152
153         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
154
155         assertFalse(syncStatus.isInitialSyncDone());
156     }
157
158     @Test
159     public void testHandleSyncUpAppendEntries() throws Exception {
160         logStart("testHandleSyncUpAppendEntries");
161
162         MockRaftActorContext context = createActorContext();
163
164         List<ReplicatedLogEntry> entries = Arrays.asList(
165                 newReplicatedLogEntry(2, 101, "foo"));
166
167         // The new commitIndex is 101
168         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
169
170         follower = createBehavior(context);
171         follower.handleMessage(leaderActor, appendEntries);
172
173         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
174
175         assertFalse(syncStatus.isInitialSyncDone());
176
177         // Clear all the messages
178         followerActor.underlyingActor().clear();
179
180         context.setLastApplied(101);
181         context.setCommitIndex(101);
182         setLastLogEntry(context, 1, 101,
183                 new MockRaftActorContext.MockPayload(""));
184
185         entries = Arrays.asList(
186                 newReplicatedLogEntry(2, 101, "foo"));
187
188         // The new commitIndex is 101
189         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
190         follower.handleMessage(leaderActor, appendEntries);
191
192         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
193
194         assertTrue(syncStatus.isInitialSyncDone());
195
196         followerActor.underlyingActor().clear();
197
198         // Sending the same message again should not generate another message
199
200         follower.handleMessage(leaderActor, appendEntries);
201
202         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
203
204         assertNull(syncStatus);
205
206     }
207
208     @Test
209     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
210         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
211
212         MockRaftActorContext context = createActorContext();
213
214         List<ReplicatedLogEntry> entries = Arrays.asList(
215                 newReplicatedLogEntry(2, 101, "foo"));
216
217         // The new commitIndex is 101
218         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
219
220         follower = createBehavior(context);
221         follower.handleMessage(leaderActor, appendEntries);
222
223         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
224
225         assertFalse(syncStatus.isInitialSyncDone());
226
227         // Clear all the messages
228         followerActor.underlyingActor().clear();
229
230         context.setLastApplied(100);
231         setLastLogEntry(context, 1, 100,
232                 new MockRaftActorContext.MockPayload(""));
233
234         entries = Arrays.asList(
235                 newReplicatedLogEntry(2, 101, "foo"));
236
237         // leader-2 is becoming the leader now and it says the commitIndex is 45
238         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
239         follower.handleMessage(leaderActor, appendEntries);
240
241         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
242
243         // We get a new message saying initial status is not done
244         assertFalse(syncStatus.isInitialSyncDone());
245
246     }
247
248
249     @Test
250     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
251         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
252
253         MockRaftActorContext context = createActorContext();
254
255         List<ReplicatedLogEntry> entries = Arrays.asList(
256                 newReplicatedLogEntry(2, 101, "foo"));
257
258         // The new commitIndex is 101
259         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
260
261         follower = createBehavior(context);
262         follower.handleMessage(leaderActor, appendEntries);
263
264         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
265
266         assertFalse(syncStatus.isInitialSyncDone());
267
268         // Clear all the messages
269         followerActor.underlyingActor().clear();
270
271         context.setLastApplied(101);
272         context.setCommitIndex(101);
273         setLastLogEntry(context, 1, 101,
274                 new MockRaftActorContext.MockPayload(""));
275
276         entries = Arrays.asList(
277                 newReplicatedLogEntry(2, 101, "foo"));
278
279         // The new commitIndex is 101
280         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
281         follower.handleMessage(leaderActor, appendEntries);
282
283         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
284
285         assertTrue(syncStatus.isInitialSyncDone());
286
287         // Clear all the messages
288         followerActor.underlyingActor().clear();
289
290         context.setLastApplied(100);
291         setLastLogEntry(context, 1, 100,
292                 new MockRaftActorContext.MockPayload(""));
293
294         entries = Arrays.asList(
295                 newReplicatedLogEntry(2, 101, "foo"));
296
297         // leader-2 is becoming the leader now and it says the commitIndex is 45
298         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
299         follower.handleMessage(leaderActor, appendEntries);
300
301         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
302
303         // We get a new message saying initial status is not done
304         assertFalse(syncStatus.isInitialSyncDone());
305
306     }
307
308
309     /**
310      * This test verifies that when an AppendEntries RPC is received by a RaftActor
311      * with a commitIndex that is greater than what has been applied to the
312      * state machine of the RaftActor, the RaftActor applies the state and
313      * sets it current applied state to the commitIndex of the sender.
314      *
315      * @throws Exception
316      */
317     @Test
318     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
319         logStart("testHandleAppendEntriesWithNewerCommitIndex");
320
321         MockRaftActorContext context = createActorContext();
322
323         context.setLastApplied(100);
324         setLastLogEntry(context, 1, 100,
325                 new MockRaftActorContext.MockPayload(""));
326         context.getReplicatedLog().setSnapshotIndex(99);
327
328         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
329                 newReplicatedLogEntry(2, 101, "foo"));
330
331         // The new commitIndex is 101
332         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
333
334         follower = createBehavior(context);
335         follower.handleMessage(leaderActor, appendEntries);
336
337         assertEquals("getLastApplied", 101L, context.getLastApplied());
338     }
339
340     /**
341      * This test verifies that when an AppendEntries is received a specific prevLogTerm
342      * which does not match the term that is in RaftActors log entry at prevLogIndex
343      * then the RaftActor does not change it's state and it returns a failure.
344      *
345      * @throws Exception
346      */
347     @Test
348     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
349         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
350
351         MockRaftActorContext context = createActorContext();
352
353         // First set the receivers term to lower number
354         context.getTermInformation().update(95, "test");
355
356         // AppendEntries is now sent with a bigger term
357         // this will set the receivers term to be the same as the sender's term
358         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
359
360         follower = createBehavior(context);
361
362         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
363
364         Assert.assertSame(follower, newBehavior);
365
366         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
367                 AppendEntriesReply.class);
368
369         assertEquals("isSuccess", false, reply.isSuccess());
370     }
371
372     /**
373      * This test verifies that when a new AppendEntries message is received with
374      * new entries and the logs of the sender and receiver match that the new
375      * entries get added to the log and the log is incremented by the number of
376      * entries received in appendEntries
377      *
378      * @throws Exception
379      */
380     @Test
381     public void testHandleAppendEntriesAddNewEntries() {
382         logStart("testHandleAppendEntriesAddNewEntries");
383
384         MockRaftActorContext context = createActorContext();
385
386         // First set the receivers term to lower number
387         context.getTermInformation().update(1, "test");
388
389         // Prepare the receivers log
390         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
391         log.append(newReplicatedLogEntry(1, 0, "zero"));
392         log.append(newReplicatedLogEntry(1, 1, "one"));
393         log.append(newReplicatedLogEntry(1, 2, "two"));
394
395         context.setReplicatedLog(log);
396
397         // Prepare the entries to be sent with AppendEntries
398         List<ReplicatedLogEntry> entries = new ArrayList<>();
399         entries.add(newReplicatedLogEntry(1, 3, "three"));
400         entries.add(newReplicatedLogEntry(1, 4, "four"));
401
402         // Send appendEntries with the same term as was set on the receiver
403         // before the new behavior was created (1 in this case)
404         // This will not work for a Candidate because as soon as a Candidate
405         // is created it increments the term
406         short leaderPayloadVersion = 10;
407         String leaderId = "leader-1";
408         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
409
410         follower = createBehavior(context);
411
412         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
413
414         Assert.assertSame(follower, newBehavior);
415
416         assertEquals("Next index", 5, log.last().getIndex() + 1);
417         assertEquals("Entry 3", entries.get(0), log.get(3));
418         assertEquals("Entry 4", entries.get(1), log.get(4));
419
420         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
421         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
422
423         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
424     }
425
426     /**
427      * This test verifies that when a new AppendEntries message is received with
428      * new entries and the logs of the sender and receiver are out-of-sync that
429      * the log is first corrected by removing the out of sync entries from the
430      * log and then adding in the new entries sent with the AppendEntries message
431      */
432     @Test
433     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
434         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
435
436         MockRaftActorContext context = createActorContext();
437
438         // First set the receivers term to lower number
439         context.getTermInformation().update(1, "test");
440
441         // Prepare the receivers log
442         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
443         log.append(newReplicatedLogEntry(1, 0, "zero"));
444         log.append(newReplicatedLogEntry(1, 1, "one"));
445         log.append(newReplicatedLogEntry(1, 2, "two"));
446
447         context.setReplicatedLog(log);
448
449         // Prepare the entries to be sent with AppendEntries
450         List<ReplicatedLogEntry> entries = new ArrayList<>();
451         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
452         entries.add(newReplicatedLogEntry(2, 3, "three"));
453
454         // Send appendEntries with the same term as was set on the receiver
455         // before the new behavior was created (1 in this case)
456         // This will not work for a Candidate because as soon as a Candidate
457         // is created it increments the term
458         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
459
460         follower = createBehavior(context);
461
462         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
463
464         Assert.assertSame(follower, newBehavior);
465
466         // The entry at index 2 will be found out-of-sync with the leader
467         // and will be removed
468         // Then the two new entries will be added to the log
469         // Thus making the log to have 4 entries
470         assertEquals("Next index", 4, log.last().getIndex() + 1);
471         //assertEquals("Entry 2", entries.get(0), log.get(2));
472
473         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
474
475         // Check that the entry at index 2 has the new data
476         assertEquals("Entry 2", entries.get(0), log.get(2));
477
478         assertEquals("Entry 3", entries.get(1), log.get(3));
479
480         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
481     }
482
483     @Test
484     public void testHandleAppendEntriesPreviousLogEntryMissing(){
485         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
486
487         MockRaftActorContext context = createActorContext();
488
489         // Prepare the receivers log
490         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
491         log.append(newReplicatedLogEntry(1, 0, "zero"));
492         log.append(newReplicatedLogEntry(1, 1, "one"));
493         log.append(newReplicatedLogEntry(1, 2, "two"));
494
495         context.setReplicatedLog(log);
496
497         // Prepare the entries to be sent with AppendEntries
498         List<ReplicatedLogEntry> entries = new ArrayList<>();
499         entries.add(newReplicatedLogEntry(1, 4, "four"));
500
501         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
502
503         follower = createBehavior(context);
504
505         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
506
507         Assert.assertSame(follower, newBehavior);
508
509         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
510     }
511
512     @Test
513     public void testHandleAppendEntriesWithExistingLogEntry() {
514         logStart("testHandleAppendEntriesWithExistingLogEntry");
515
516         MockRaftActorContext context = createActorContext();
517
518         context.getTermInformation().update(1, "test");
519
520         // Prepare the receivers log
521         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
522         log.append(newReplicatedLogEntry(1, 0, "zero"));
523         log.append(newReplicatedLogEntry(1, 1, "one"));
524
525         context.setReplicatedLog(log);
526
527         // Send the last entry again.
528         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
529
530         follower = createBehavior(context);
531
532         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
533
534         assertEquals("Next index", 2, log.last().getIndex() + 1);
535         assertEquals("Entry 1", entries.get(0), log.get(1));
536
537         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
538
539         // Send the last entry again and also a new one.
540
541         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
542
543         leaderActor.underlyingActor().clear();
544         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
545
546         assertEquals("Next index", 3, log.last().getIndex() + 1);
547         assertEquals("Entry 1", entries.get(0), log.get(1));
548         assertEquals("Entry 2", entries.get(1), log.get(2));
549
550         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
551     }
552
553     @Test
554     public void testHandleAppendEntriesAfterInstallingSnapshot(){
555         logStart("testHandleAppendAfterInstallingSnapshot");
556
557         MockRaftActorContext context = createActorContext();
558
559         // Prepare the receivers log
560         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
561
562         // Set up a log as if it has been snapshotted
563         log.setSnapshotIndex(3);
564         log.setSnapshotTerm(1);
565
566         context.setReplicatedLog(log);
567
568         // Prepare the entries to be sent with AppendEntries
569         List<ReplicatedLogEntry> entries = new ArrayList<>();
570         entries.add(newReplicatedLogEntry(1, 4, "four"));
571
572         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
573
574         follower = createBehavior(context);
575
576         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
577
578         Assert.assertSame(follower, newBehavior);
579
580         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
581     }
582
583
584     /**
585      * This test verifies that when InstallSnapshot is received by
586      * the follower its applied correctly.
587      *
588      * @throws Exception
589      */
590     @Test
591     public void testHandleInstallSnapshot() throws Exception {
592         logStart("testHandleInstallSnapshot");
593
594         MockRaftActorContext context = createActorContext();
595
596         follower = createBehavior(context);
597
598         ByteString bsSnapshot  = createSnapshot();
599         int offset = 0;
600         int snapshotLength = bsSnapshot.size();
601         int chunkSize = 50;
602         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
603         int lastIncludedIndex = 1;
604         int chunkIndex = 1;
605         InstallSnapshot lastInstallSnapshot = null;
606
607         for(int i = 0; i < totalChunks; i++) {
608             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
609             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
610                     chunkData, chunkIndex, totalChunks);
611             follower.handleMessage(leaderActor, lastInstallSnapshot);
612             offset = offset + 50;
613             lastIncludedIndex++;
614             chunkIndex++;
615         }
616
617         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
618                 ApplySnapshot.class);
619         Snapshot snapshot = applySnapshot.getSnapshot();
620         assertNotNull(lastInstallSnapshot);
621         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
622         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
623                 snapshot.getLastAppliedTerm());
624         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
625                 snapshot.getLastAppliedIndex());
626         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
627         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
628
629         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
630                 leaderActor, InstallSnapshotReply.class);
631         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
632
633         chunkIndex = 1;
634         for(InstallSnapshotReply reply: replies) {
635             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
636             assertEquals("getTerm", 1, reply.getTerm());
637             assertEquals("isSuccess", true, reply.isSuccess());
638             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
639         }
640
641         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
642     }
643
644
645     /**
646      * Verify that when an AppendEntries is sent to a follower during a snapshot install
647      * the Follower short-circuits the processing of the AppendEntries message.
648      *
649      * @throws Exception
650      */
651     @Test
652     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
653         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
654
655         MockRaftActorContext context = createActorContext();
656
657         follower = createBehavior(context);
658
659         ByteString bsSnapshot  = createSnapshot();
660         int snapshotLength = bsSnapshot.size();
661         int chunkSize = 50;
662         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
663         int lastIncludedIndex = 1;
664
665         // Check that snapshot installation is not in progress
666         assertNull(((Follower) follower).getSnapshotTracker());
667
668         // Make sure that we have more than 1 chunk to send
669         assertTrue(totalChunks > 1);
670
671         // Send an install snapshot with the first chunk to start the process of installing a snapshot
672         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
673         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
674                 chunkData, 1, totalChunks));
675
676         // Check if snapshot installation is in progress now
677         assertNotNull(((Follower) follower).getSnapshotTracker());
678
679         // Send an append entry
680         AppendEntries appendEntries = mock(AppendEntries.class);
681         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
682
683         follower.handleMessage(leaderActor, appendEntries);
684
685         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
686         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
687         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
688         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
689
690         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
691         verify(appendEntries, never()).getPrevLogIndex();
692
693     }
694
695     @Test
696     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
697         logStart("testInitialSyncUpWithHandleInstallSnapshot");
698
699         MockRaftActorContext context = createActorContext();
700
701         follower = createBehavior(context);
702
703         ByteString bsSnapshot  = createSnapshot();
704         int offset = 0;
705         int snapshotLength = bsSnapshot.size();
706         int chunkSize = 50;
707         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
708         int lastIncludedIndex = 1;
709         int chunkIndex = 1;
710         InstallSnapshot lastInstallSnapshot = null;
711
712         for(int i = 0; i < totalChunks; i++) {
713             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
714             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
715                     chunkData, chunkIndex, totalChunks);
716             follower.handleMessage(leaderActor, lastInstallSnapshot);
717             offset = offset + 50;
718             lastIncludedIndex++;
719             chunkIndex++;
720         }
721
722         FollowerInitialSyncUpStatus syncStatus =
723                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
724
725         assertFalse(syncStatus.isInitialSyncDone());
726
727         // Clear all the messages
728         followerActor.underlyingActor().clear();
729
730         context.setLastApplied(101);
731         context.setCommitIndex(101);
732         setLastLogEntry(context, 1, 101,
733                 new MockRaftActorContext.MockPayload(""));
734
735         List<ReplicatedLogEntry> entries = Arrays.asList(
736                 newReplicatedLogEntry(2, 101, "foo"));
737
738         // The new commitIndex is 101
739         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
740         follower.handleMessage(leaderActor, appendEntries);
741
742         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
743
744         assertTrue(syncStatus.isInitialSyncDone());
745     }
746
747     @Test
748     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
749         logStart("testHandleOutOfSequenceInstallSnapshot");
750
751         MockRaftActorContext context = createActorContext();
752
753         follower = createBehavior(context);
754
755         ByteString bsSnapshot = createSnapshot();
756
757         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
758                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
759         follower.handleMessage(leaderActor, installSnapshot);
760
761         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
762                 InstallSnapshotReply.class);
763
764         assertEquals("isSuccess", false, reply.isSuccess());
765         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
766         assertEquals("getTerm", 1, reply.getTerm());
767         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
768
769         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
770     }
771
772     @Test
773     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
774         MockRaftActorContext context = createActorContext();
775
776         Stopwatch stopwatch = Stopwatch.createStarted();
777
778         follower = createBehavior(context);
779
780         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
781
782         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
783
784         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
785     }
786
787     @Test
788     public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
789         MockRaftActorContext context = createActorContext();
790         context.setConfigParams(new DefaultConfigParamsImpl(){
791             @Override
792             public FiniteDuration getElectionTimeOutInterval() {
793                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
794             }
795         });
796
797         context.setRaftPolicy(createRaftPolicy(false, false));
798
799         follower = createBehavior(context);
800
801         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
802     }
803
804
805     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
806         int snapshotLength = bs.size();
807         int start = offset;
808         int size = chunkSize;
809         if (chunkSize > snapshotLength) {
810             size = snapshotLength;
811         } else {
812             if ((start + chunkSize) > snapshotLength) {
813                 size = snapshotLength - start;
814             }
815         }
816         return bs.substring(start, start + size);
817     }
818
819     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
820             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
821
822         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
823                 AppendEntriesReply.class);
824
825         assertEquals("isSuccess", expSuccess, reply.isSuccess());
826         assertEquals("getTerm", expTerm, reply.getTerm());
827         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
828         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
829         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
830         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
831     }
832
833     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
834         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
835                 new MockRaftActorContext.MockPayload(data));
836     }
837
838     private ByteString createSnapshot(){
839         HashMap<String, String> followerSnapshot = new HashMap<>();
840         followerSnapshot.put("1", "A");
841         followerSnapshot.put("2", "B");
842         followerSnapshot.put("3", "C");
843
844         return toByteString(followerSnapshot);
845     }
846
847     @Override
848     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
849             ActorRef actorRef, RaftRPC rpc) throws Exception {
850         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
851
852         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
853         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
854     }
855
856     @Override
857     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
858             throws Exception {
859         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
860         assertEquals("isSuccess", true, reply.isSuccess());
861     }
862 }