Clear leaderId when election timeout occurs in non-voting follower
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.verify;
23 import akka.actor.ActorRef;
24 import akka.actor.Props;
25 import akka.testkit.TestActorRef;
26 import com.google.common.base.Stopwatch;
27 import com.google.protobuf.ByteString;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Assert;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
41 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
42 import org.opendaylight.controller.cluster.raft.Snapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
45 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
47 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
48 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
49 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
50 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
51 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
52 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
53 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
54 import scala.concurrent.duration.FiniteDuration;
55
56 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
57
58     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
59             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
60
61     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
62             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
63
64     private Follower follower;
65
66     private final short payloadVersion = 5;
67
68     @Override
69     @After
70     public void tearDown() throws Exception {
71         if(follower != null) {
72             follower.close();
73         }
74
75         super.tearDown();
76     }
77
78     @Override
79     protected Follower createBehavior(RaftActorContext actorContext) {
80         return spy(new Follower(actorContext));
81     }
82
83     @Override
84     protected  MockRaftActorContext createActorContext() {
85         return createActorContext(followerActor);
86     }
87
88     @Override
89     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
90         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
91         context.setPayloadVersion(payloadVersion );
92         return context;
93     }
94
95     @Test
96     public void testThatAnElectionTimeoutIsTriggered(){
97         MockRaftActorContext actorContext = createActorContext();
98         follower = new Follower(actorContext);
99
100         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
101                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
102     }
103
104     @Test
105     public void testHandleElectionTimeout(){
106         logStart("testHandleElectionTimeout");
107
108         follower = new Follower(createActorContext());
109
110         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, ElectionTimeout.INSTANCE);
111
112         assertTrue(raftBehavior instanceof Candidate);
113     }
114
115     @Test
116     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
117         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
118
119         MockRaftActorContext context = createActorContext();
120         long term = 1000;
121         context.getTermInformation().update(term, null);
122
123         follower = createBehavior(context);
124
125         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
126
127         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
128
129         assertEquals("isVoteGranted", true, reply.isVoteGranted());
130         assertEquals("getTerm", term, reply.getTerm());
131         verify(follower).scheduleElection(any(FiniteDuration.class));
132     }
133
134     @Test
135     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
136         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
137
138         MockRaftActorContext context = createActorContext();
139         long term = 1000;
140         context.getTermInformation().update(term, "test");
141
142         follower = createBehavior(context);
143
144         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
145
146         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
147
148         assertEquals("isVoteGranted", false, reply.isVoteGranted());
149         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
150     }
151
152
153     @Test
154     public void testHandleFirstAppendEntries() throws Exception {
155         logStart("testHandleFirstAppendEntries");
156
157         MockRaftActorContext context = createActorContext();
158         context.getReplicatedLog().clear(0,2);
159         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
160         context.getReplicatedLog().setSnapshotIndex(99);
161
162         List<ReplicatedLogEntry> entries = Arrays.asList(
163                 newReplicatedLogEntry(2, 101, "foo"));
164
165         Assert.assertEquals(1, context.getReplicatedLog().size());
166
167         // The new commitIndex is 101
168         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
169
170         follower = createBehavior(context);
171         follower.handleMessage(leaderActor, appendEntries);
172
173         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
174         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
175
176         assertFalse(syncStatus.isInitialSyncDone());
177         assertTrue("append entries reply should be true", reply.isSuccess());
178     }
179
180     @Test
181     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
182         logStart("testHandleFirstAppendEntries");
183
184         MockRaftActorContext context = createActorContext();
185
186         List<ReplicatedLogEntry> entries = Arrays.asList(
187                 newReplicatedLogEntry(2, 101, "foo"));
188
189         // The new commitIndex is 101
190         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
191
192         follower = createBehavior(context);
193         follower.handleMessage(leaderActor, appendEntries);
194
195         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
196         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
197
198         assertFalse(syncStatus.isInitialSyncDone());
199         assertFalse("append entries reply should be false", reply.isSuccess());
200     }
201
202     @Test
203     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
204         logStart("testHandleFirstAppendEntries");
205
206         MockRaftActorContext context = createActorContext();
207         context.getReplicatedLog().clear(0,2);
208         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
209         context.getReplicatedLog().setSnapshotIndex(99);
210
211         List<ReplicatedLogEntry> entries = Arrays.asList(
212                 newReplicatedLogEntry(2, 101, "foo"));
213
214         // The new commitIndex is 101
215         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
216
217         follower = createBehavior(context);
218         follower.handleMessage(leaderActor, appendEntries);
219
220         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
221         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
222
223         assertFalse(syncStatus.isInitialSyncDone());
224         assertTrue("append entries reply should be true", reply.isSuccess());
225     }
226
227     @Test
228     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
229         logStart("testHandleFirstAppendEntries");
230
231         MockRaftActorContext context = createActorContext();
232         context.getReplicatedLog().clear(0,2);
233         context.getReplicatedLog().setSnapshotIndex(100);
234
235         List<ReplicatedLogEntry> entries = Arrays.asList(
236                 newReplicatedLogEntry(2, 101, "foo"));
237
238         // The new commitIndex is 101
239         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
240
241         follower = createBehavior(context);
242         follower.handleMessage(leaderActor, appendEntries);
243
244         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
245         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
246
247         assertFalse(syncStatus.isInitialSyncDone());
248         assertTrue("append entries reply should be true", reply.isSuccess());
249     }
250
251     @Test
252     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
253         logStart("testHandleFirstAppendEntries");
254
255         MockRaftActorContext context = createActorContext();
256         context.getReplicatedLog().clear(0,2);
257         context.getReplicatedLog().setSnapshotIndex(100);
258
259         List<ReplicatedLogEntry> entries = Arrays.asList(
260                 newReplicatedLogEntry(2, 105, "foo"));
261
262         // The new commitIndex is 101
263         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
264
265         follower = createBehavior(context);
266         follower.handleMessage(leaderActor, appendEntries);
267
268         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
269         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
270
271         assertFalse(syncStatus.isInitialSyncDone());
272         assertFalse("append entries reply should be false", reply.isSuccess());
273     }
274
275     @Test
276     public void testHandleSyncUpAppendEntries() throws Exception {
277         logStart("testHandleSyncUpAppendEntries");
278
279         MockRaftActorContext context = createActorContext();
280
281         List<ReplicatedLogEntry> entries = Arrays.asList(
282                 newReplicatedLogEntry(2, 101, "foo"));
283
284         // The new commitIndex is 101
285         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
286
287         follower = createBehavior(context);
288         follower.handleMessage(leaderActor, appendEntries);
289
290         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
291
292         assertFalse(syncStatus.isInitialSyncDone());
293
294         // Clear all the messages
295         followerActor.underlyingActor().clear();
296
297         context.setLastApplied(101);
298         context.setCommitIndex(101);
299         setLastLogEntry(context, 1, 101,
300                 new MockRaftActorContext.MockPayload(""));
301
302         entries = Arrays.asList(
303                 newReplicatedLogEntry(2, 101, "foo"));
304
305         // The new commitIndex is 101
306         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
307         follower.handleMessage(leaderActor, appendEntries);
308
309         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
310
311         assertTrue(syncStatus.isInitialSyncDone());
312
313         followerActor.underlyingActor().clear();
314
315         // Sending the same message again should not generate another message
316
317         follower.handleMessage(leaderActor, appendEntries);
318
319         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
320
321         assertNull(syncStatus);
322
323     }
324
325     @Test
326     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
327         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
328
329         MockRaftActorContext context = createActorContext();
330
331         List<ReplicatedLogEntry> entries = Arrays.asList(
332                 newReplicatedLogEntry(2, 101, "foo"));
333
334         // The new commitIndex is 101
335         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
336
337         follower = createBehavior(context);
338         follower.handleMessage(leaderActor, appendEntries);
339
340         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
341
342         assertFalse(syncStatus.isInitialSyncDone());
343
344         // Clear all the messages
345         followerActor.underlyingActor().clear();
346
347         context.setLastApplied(100);
348         setLastLogEntry(context, 1, 100,
349                 new MockRaftActorContext.MockPayload(""));
350
351         entries = Arrays.asList(
352                 newReplicatedLogEntry(2, 101, "foo"));
353
354         // leader-2 is becoming the leader now and it says the commitIndex is 45
355         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
356         follower.handleMessage(leaderActor, appendEntries);
357
358         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
359
360         // We get a new message saying initial status is not done
361         assertFalse(syncStatus.isInitialSyncDone());
362
363     }
364
365
366     @Test
367     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
368         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
369
370         MockRaftActorContext context = createActorContext();
371
372         List<ReplicatedLogEntry> entries = Arrays.asList(
373                 newReplicatedLogEntry(2, 101, "foo"));
374
375         // The new commitIndex is 101
376         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
377
378         follower = createBehavior(context);
379         follower.handleMessage(leaderActor, appendEntries);
380
381         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
382
383         assertFalse(syncStatus.isInitialSyncDone());
384
385         // Clear all the messages
386         followerActor.underlyingActor().clear();
387
388         context.setLastApplied(101);
389         context.setCommitIndex(101);
390         setLastLogEntry(context, 1, 101,
391                 new MockRaftActorContext.MockPayload(""));
392
393         entries = Arrays.asList(
394                 newReplicatedLogEntry(2, 101, "foo"));
395
396         // The new commitIndex is 101
397         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
398         follower.handleMessage(leaderActor, appendEntries);
399
400         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
401
402         assertTrue(syncStatus.isInitialSyncDone());
403
404         // Clear all the messages
405         followerActor.underlyingActor().clear();
406
407         context.setLastApplied(100);
408         setLastLogEntry(context, 1, 100,
409                 new MockRaftActorContext.MockPayload(""));
410
411         entries = Arrays.asList(
412                 newReplicatedLogEntry(2, 101, "foo"));
413
414         // leader-2 is becoming the leader now and it says the commitIndex is 45
415         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
416         follower.handleMessage(leaderActor, appendEntries);
417
418         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
419
420         // We get a new message saying initial status is not done
421         assertFalse(syncStatus.isInitialSyncDone());
422
423     }
424
425
426     /**
427      * This test verifies that when an AppendEntries RPC is received by a RaftActor
428      * with a commitIndex that is greater than what has been applied to the
429      * state machine of the RaftActor, the RaftActor applies the state and
430      * sets it current applied state to the commitIndex of the sender.
431      *
432      * @throws Exception
433      */
434     @Test
435     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
436         logStart("testHandleAppendEntriesWithNewerCommitIndex");
437
438         MockRaftActorContext context = createActorContext();
439
440         context.setLastApplied(100);
441         setLastLogEntry(context, 1, 100,
442                 new MockRaftActorContext.MockPayload(""));
443         context.getReplicatedLog().setSnapshotIndex(99);
444
445         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
446                 newReplicatedLogEntry(2, 101, "foo"));
447
448         // The new commitIndex is 101
449         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
450
451         follower = createBehavior(context);
452         follower.handleMessage(leaderActor, appendEntries);
453
454         assertEquals("getLastApplied", 101L, context.getLastApplied());
455     }
456
457     /**
458      * This test verifies that when an AppendEntries is received a specific prevLogTerm
459      * which does not match the term that is in RaftActors log entry at prevLogIndex
460      * then the RaftActor does not change it's state and it returns a failure.
461      *
462      * @throws Exception
463      */
464     @Test
465     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
466         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
467
468         MockRaftActorContext context = createActorContext();
469
470         // First set the receivers term to lower number
471         context.getTermInformation().update(95, "test");
472
473         // AppendEntries is now sent with a bigger term
474         // this will set the receivers term to be the same as the sender's term
475         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
476
477         follower = createBehavior(context);
478
479         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
480
481         Assert.assertSame(follower, newBehavior);
482
483         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
484                 AppendEntriesReply.class);
485
486         assertEquals("isSuccess", false, reply.isSuccess());
487     }
488
489     /**
490      * This test verifies that when a new AppendEntries message is received with
491      * new entries and the logs of the sender and receiver match that the new
492      * entries get added to the log and the log is incremented by the number of
493      * entries received in appendEntries
494      *
495      * @throws Exception
496      */
497     @Test
498     public void testHandleAppendEntriesAddNewEntries() {
499         logStart("testHandleAppendEntriesAddNewEntries");
500
501         MockRaftActorContext context = createActorContext();
502
503         // First set the receivers term to lower number
504         context.getTermInformation().update(1, "test");
505
506         // Prepare the receivers log
507         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
508         log.append(newReplicatedLogEntry(1, 0, "zero"));
509         log.append(newReplicatedLogEntry(1, 1, "one"));
510         log.append(newReplicatedLogEntry(1, 2, "two"));
511
512         context.setReplicatedLog(log);
513
514         // Prepare the entries to be sent with AppendEntries
515         List<ReplicatedLogEntry> entries = new ArrayList<>();
516         entries.add(newReplicatedLogEntry(1, 3, "three"));
517         entries.add(newReplicatedLogEntry(1, 4, "four"));
518
519         // Send appendEntries with the same term as was set on the receiver
520         // before the new behavior was created (1 in this case)
521         // This will not work for a Candidate because as soon as a Candidate
522         // is created it increments the term
523         short leaderPayloadVersion = 10;
524         String leaderId = "leader-1";
525         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
526
527         follower = createBehavior(context);
528
529         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
530
531         Assert.assertSame(follower, newBehavior);
532
533         assertEquals("Next index", 5, log.last().getIndex() + 1);
534         assertEquals("Entry 3", entries.get(0), log.get(3));
535         assertEquals("Entry 4", entries.get(1), log.get(4));
536
537         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
538         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
539
540         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
541     }
542
543     /**
544      * This test verifies that when a new AppendEntries message is received with
545      * new entries and the logs of the sender and receiver are out-of-sync that
546      * the log is first corrected by removing the out of sync entries from the
547      * log and then adding in the new entries sent with the AppendEntries message
548      */
549     @Test
550     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
551         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
552
553         MockRaftActorContext context = createActorContext();
554
555         // First set the receivers term to lower number
556         context.getTermInformation().update(1, "test");
557
558         // Prepare the receivers log
559         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
560         log.append(newReplicatedLogEntry(1, 0, "zero"));
561         log.append(newReplicatedLogEntry(1, 1, "one"));
562         log.append(newReplicatedLogEntry(1, 2, "two"));
563
564         context.setReplicatedLog(log);
565
566         // Prepare the entries to be sent with AppendEntries
567         List<ReplicatedLogEntry> entries = new ArrayList<>();
568         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
569         entries.add(newReplicatedLogEntry(2, 3, "three"));
570
571         // Send appendEntries with the same term as was set on the receiver
572         // before the new behavior was created (1 in this case)
573         // This will not work for a Candidate because as soon as a Candidate
574         // is created it increments the term
575         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
576
577         follower = createBehavior(context);
578
579         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
580
581         Assert.assertSame(follower, newBehavior);
582
583         // The entry at index 2 will be found out-of-sync with the leader
584         // and will be removed
585         // Then the two new entries will be added to the log
586         // Thus making the log to have 4 entries
587         assertEquals("Next index", 4, log.last().getIndex() + 1);
588         //assertEquals("Entry 2", entries.get(0), log.get(2));
589
590         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
591
592         // Check that the entry at index 2 has the new data
593         assertEquals("Entry 2", entries.get(0), log.get(2));
594
595         assertEquals("Entry 3", entries.get(1), log.get(3));
596
597         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
598     }
599
600     @Test
601     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
602         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
603
604         MockRaftActorContext context = createActorContext();
605
606         // First set the receivers term to lower number
607         context.getTermInformation().update(1, "test");
608
609         // Prepare the receivers log
610         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
611         log.append(newReplicatedLogEntry(1, 0, "zero"));
612         log.append(newReplicatedLogEntry(1, 1, "one"));
613         log.append(newReplicatedLogEntry(1, 2, "two"));
614
615         context.setReplicatedLog(log);
616
617         // Prepare the entries to be sent with AppendEntries
618         List<ReplicatedLogEntry> entries = new ArrayList<>();
619         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
620         entries.add(newReplicatedLogEntry(2, 3, "three"));
621
622         // Send appendEntries with the same term as was set on the receiver
623         // before the new behavior was created (1 in this case)
624         // This will not work for a Candidate because as soon as a Candidate
625         // is created it increments the term
626         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
627
628         context.setRaftPolicy(createRaftPolicy(false, true));
629         follower = createBehavior(context);
630
631         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
632
633         Assert.assertSame(follower, newBehavior);
634
635         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
636     }
637
638     @Test
639     public void testHandleAppendEntriesPreviousLogEntryMissing(){
640         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
641
642         MockRaftActorContext context = createActorContext();
643
644         // Prepare the receivers log
645         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
646         log.append(newReplicatedLogEntry(1, 0, "zero"));
647         log.append(newReplicatedLogEntry(1, 1, "one"));
648         log.append(newReplicatedLogEntry(1, 2, "two"));
649
650         context.setReplicatedLog(log);
651
652         // Prepare the entries to be sent with AppendEntries
653         List<ReplicatedLogEntry> entries = new ArrayList<>();
654         entries.add(newReplicatedLogEntry(1, 4, "four"));
655
656         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
657
658         follower = createBehavior(context);
659
660         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
661
662         Assert.assertSame(follower, newBehavior);
663
664         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
665     }
666
667     @Test
668     public void testHandleAppendEntriesWithExistingLogEntry() {
669         logStart("testHandleAppendEntriesWithExistingLogEntry");
670
671         MockRaftActorContext context = createActorContext();
672
673         context.getTermInformation().update(1, "test");
674
675         // Prepare the receivers log
676         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
677         log.append(newReplicatedLogEntry(1, 0, "zero"));
678         log.append(newReplicatedLogEntry(1, 1, "one"));
679
680         context.setReplicatedLog(log);
681
682         // Send the last entry again.
683         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
684
685         follower = createBehavior(context);
686
687         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
688
689         assertEquals("Next index", 2, log.last().getIndex() + 1);
690         assertEquals("Entry 1", entries.get(0), log.get(1));
691
692         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
693
694         // Send the last entry again and also a new one.
695
696         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
697
698         leaderActor.underlyingActor().clear();
699         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
700
701         assertEquals("Next index", 3, log.last().getIndex() + 1);
702         assertEquals("Entry 1", entries.get(0), log.get(1));
703         assertEquals("Entry 2", entries.get(1), log.get(2));
704
705         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
706     }
707
708     @Test
709     public void testHandleAppendEntriesAfterInstallingSnapshot(){
710         logStart("testHandleAppendAfterInstallingSnapshot");
711
712         MockRaftActorContext context = createActorContext();
713
714         // Prepare the receivers log
715         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
716
717         // Set up a log as if it has been snapshotted
718         log.setSnapshotIndex(3);
719         log.setSnapshotTerm(1);
720
721         context.setReplicatedLog(log);
722
723         // Prepare the entries to be sent with AppendEntries
724         List<ReplicatedLogEntry> entries = new ArrayList<>();
725         entries.add(newReplicatedLogEntry(1, 4, "four"));
726
727         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
728
729         follower = createBehavior(context);
730
731         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
732
733         Assert.assertSame(follower, newBehavior);
734
735         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
736     }
737
738
739     /**
740      * This test verifies that when InstallSnapshot is received by
741      * the follower its applied correctly.
742      *
743      * @throws Exception
744      */
745     @Test
746     public void testHandleInstallSnapshot() throws Exception {
747         logStart("testHandleInstallSnapshot");
748
749         MockRaftActorContext context = createActorContext();
750         context.getTermInformation().update(1, "leader");
751
752         follower = createBehavior(context);
753
754         ByteString bsSnapshot  = createSnapshot();
755         int offset = 0;
756         int snapshotLength = bsSnapshot.size();
757         int chunkSize = 50;
758         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
759         int lastIncludedIndex = 1;
760         int chunkIndex = 1;
761         InstallSnapshot lastInstallSnapshot = null;
762
763         for(int i = 0; i < totalChunks; i++) {
764             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
765             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
766                     chunkData, chunkIndex, totalChunks);
767             follower.handleMessage(leaderActor, lastInstallSnapshot);
768             offset = offset + 50;
769             lastIncludedIndex++;
770             chunkIndex++;
771         }
772
773         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
774                 ApplySnapshot.class);
775         Snapshot snapshot = applySnapshot.getSnapshot();
776         assertNotNull(lastInstallSnapshot);
777         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
778         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
779                 snapshot.getLastAppliedTerm());
780         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
781                 snapshot.getLastAppliedIndex());
782         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
783         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
784         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
785         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
786         applySnapshot.getCallback().onSuccess();
787
788         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
789                 leaderActor, InstallSnapshotReply.class);
790         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
791
792         chunkIndex = 1;
793         for(InstallSnapshotReply reply: replies) {
794             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
795             assertEquals("getTerm", 1, reply.getTerm());
796             assertEquals("isSuccess", true, reply.isSuccess());
797             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
798         }
799
800         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
801     }
802
803
804     /**
805      * Verify that when an AppendEntries is sent to a follower during a snapshot install
806      * the Follower short-circuits the processing of the AppendEntries message.
807      *
808      * @throws Exception
809      */
810     @Test
811     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
812         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
813
814         MockRaftActorContext context = createActorContext();
815
816         follower = createBehavior(context);
817
818         ByteString bsSnapshot  = createSnapshot();
819         int snapshotLength = bsSnapshot.size();
820         int chunkSize = 50;
821         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
822         int lastIncludedIndex = 1;
823
824         // Check that snapshot installation is not in progress
825         assertNull(follower.getSnapshotTracker());
826
827         // Make sure that we have more than 1 chunk to send
828         assertTrue(totalChunks > 1);
829
830         // Send an install snapshot with the first chunk to start the process of installing a snapshot
831         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
832         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
833                 chunkData, 1, totalChunks));
834
835         // Check if snapshot installation is in progress now
836         assertNotNull(follower.getSnapshotTracker());
837
838         // Send an append entry
839         AppendEntries appendEntries = mock(AppendEntries.class);
840         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
841
842         follower.handleMessage(leaderActor, appendEntries);
843
844         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
845         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
846         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
847         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
848
849         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
850         verify(appendEntries, never()).getPrevLogIndex();
851
852     }
853
854     @Test
855     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
856         logStart("testInitialSyncUpWithHandleInstallSnapshot");
857
858         MockRaftActorContext context = createActorContext();
859
860         follower = createBehavior(context);
861
862         ByteString bsSnapshot  = createSnapshot();
863         int offset = 0;
864         int snapshotLength = bsSnapshot.size();
865         int chunkSize = 50;
866         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
867         int lastIncludedIndex = 1;
868         int chunkIndex = 1;
869         InstallSnapshot lastInstallSnapshot = null;
870
871         for(int i = 0; i < totalChunks; i++) {
872             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
873             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
874                     chunkData, chunkIndex, totalChunks);
875             follower.handleMessage(leaderActor, lastInstallSnapshot);
876             offset = offset + 50;
877             lastIncludedIndex++;
878             chunkIndex++;
879         }
880
881         FollowerInitialSyncUpStatus syncStatus =
882                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
883
884         assertFalse(syncStatus.isInitialSyncDone());
885
886         // Clear all the messages
887         followerActor.underlyingActor().clear();
888
889         context.setLastApplied(101);
890         context.setCommitIndex(101);
891         setLastLogEntry(context, 1, 101,
892                 new MockRaftActorContext.MockPayload(""));
893
894         List<ReplicatedLogEntry> entries = Arrays.asList(
895                 newReplicatedLogEntry(2, 101, "foo"));
896
897         // The new commitIndex is 101
898         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
899         follower.handleMessage(leaderActor, appendEntries);
900
901         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
902
903         assertTrue(syncStatus.isInitialSyncDone());
904     }
905
906     @Test
907     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
908         logStart("testHandleOutOfSequenceInstallSnapshot");
909
910         MockRaftActorContext context = createActorContext();
911
912         follower = createBehavior(context);
913
914         ByteString bsSnapshot = createSnapshot();
915
916         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
917                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
918         follower.handleMessage(leaderActor, installSnapshot);
919
920         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
921                 InstallSnapshotReply.class);
922
923         assertEquals("isSuccess", false, reply.isSuccess());
924         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
925         assertEquals("getTerm", 1, reply.getTerm());
926         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
927
928         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
929     }
930
931     @Test
932     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
933         MockRaftActorContext context = createActorContext();
934
935         Stopwatch stopwatch = Stopwatch.createStarted();
936
937         follower = createBehavior(context);
938
939         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
940                 ElectionTimeout.class);
941
942         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
943
944         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
945
946         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
947         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
948     }
949
950     @Test
951     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){
952         MockRaftActorContext context = createActorContext();
953         context.setConfigParams(new DefaultConfigParamsImpl(){
954             @Override
955             public FiniteDuration getElectionTimeOutInterval() {
956                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
957             }
958         });
959
960         context.setRaftPolicy(createRaftPolicy(false, false));
961
962         follower = createBehavior(context);
963
964         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
965                 ElectionTimeout.class);
966         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
967         assertSame("handleMessage result", follower, newBehavior);
968     }
969
970     @Test
971     public void testFollowerSchedulesElectionIfNonVoting(){
972         MockRaftActorContext context = createActorContext();
973         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
974         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
975                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
976         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
977
978         follower = new Follower(context, "leader", (short)1);
979
980         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
981                 ElectionTimeout.class);
982         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
983         assertSame("handleMessage result", follower, newBehavior);
984         assertNull("Expected null leaderId", follower.getLeaderId());
985     }
986
987     @Test
988     public void testElectionScheduledWhenAnyRaftRPCReceived(){
989         MockRaftActorContext context = createActorContext();
990         follower = createBehavior(context);
991         follower.handleMessage(leaderActor, new RaftRPC() {
992             private static final long serialVersionUID = 1L;
993
994             @Override
995             public long getTerm() {
996                 return 100;
997             }
998         });
999         verify(follower).scheduleElection(any(FiniteDuration.class));
1000     }
1001
1002     @Test
1003     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
1004         MockRaftActorContext context = createActorContext();
1005         follower = createBehavior(context);
1006         follower.handleMessage(leaderActor, "non-raft-rpc");
1007         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1008     }
1009
1010     public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
1011         int snapshotLength = bs.size();
1012         int start = offset;
1013         int size = chunkSize;
1014         if (chunkSize > snapshotLength) {
1015             size = snapshotLength;
1016         } else {
1017             if ((start + chunkSize) > snapshotLength) {
1018                 size = snapshotLength - start;
1019             }
1020         }
1021
1022         byte[] nextChunk = new byte[size];
1023         bs.copyTo(nextChunk, start, 0, size);
1024         return nextChunk;
1025     }
1026
1027     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1028             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1029         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1030     }
1031
1032     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1033                                                    String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1034                                                    boolean expForceInstallSnapshot) {
1035
1036         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1037                 AppendEntriesReply.class);
1038
1039         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1040         assertEquals("getTerm", expTerm, reply.getTerm());
1041         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1042         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1043         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1044         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1045         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1046     }
1047
1048
1049     private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1050         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1051                 new MockRaftActorContext.MockPayload(data));
1052     }
1053
1054     private ByteString createSnapshot(){
1055         HashMap<String, String> followerSnapshot = new HashMap<>();
1056         followerSnapshot.put("1", "A");
1057         followerSnapshot.put("2", "B");
1058         followerSnapshot.put("3", "C");
1059
1060         return toByteString(followerSnapshot);
1061     }
1062
1063     @Override
1064     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1065             ActorRef actorRef, RaftRPC rpc) throws Exception {
1066         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1067
1068         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1069         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1070     }
1071
1072     @Override
1073     protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1074             throws Exception {
1075         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1076         assertEquals("isSuccess", true, reply.isSuccess());
1077     }
1078 }