Merge "Use Preconditions"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertTrue;
5 import akka.actor.ActorRef;
6 import akka.actor.Props;
7 import akka.testkit.TestActorRef;
8 import com.google.protobuf.ByteString;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.HashMap;
12 import java.util.List;
13 import org.junit.After;
14 import org.junit.Assert;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
17 import org.opendaylight.controller.cluster.raft.RaftActorContext;
18 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
19 import org.opendaylight.controller.cluster.raft.Snapshot;
20 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
21 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
23 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
24 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
25 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
26 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
27 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
28 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
29 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
30
31 public class FollowerTest extends AbstractRaftActorBehaviorTest {
32
33     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
34             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
35
36     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
37             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
38
39     private RaftActorBehavior follower;
40
41     @Override
42     @After
43     public void tearDown() throws Exception {
44         if(follower != null) {
45             follower.close();
46         }
47
48         super.tearDown();
49     }
50
51     @Override
52     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
53         return new Follower(actorContext);
54     }
55
56     @Override
57     protected  MockRaftActorContext createActorContext() {
58         return createActorContext(followerActor);
59     }
60
61     @Override
62     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
63         return new MockRaftActorContext("follower", getSystem(), actorRef);
64     }
65
66     @Test
67     public void testThatAnElectionTimeoutIsTriggered(){
68         MockRaftActorContext actorContext = createActorContext();
69         follower = new Follower(actorContext);
70
71         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
72                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
73     }
74
75     @Test
76     public void testHandleElectionTimeout(){
77         logStart("testHandleElectionTimeout");
78
79         follower = new Follower(createActorContext());
80
81         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
82
83         assertTrue(raftBehavior instanceof Candidate);
84     }
85
86     @Test
87     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
88         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
89
90         RaftActorContext context = createActorContext();
91         long term = 1000;
92         context.getTermInformation().update(term, null);
93
94         follower = createBehavior(context);
95
96         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
97
98         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
99
100         assertEquals("isVoteGranted", true, reply.isVoteGranted());
101         assertEquals("getTerm", term, reply.getTerm());
102     }
103
104     @Test
105     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
106         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
107
108         RaftActorContext context = createActorContext();
109         long term = 1000;
110         context.getTermInformation().update(term, "test");
111
112         follower = createBehavior(context);
113
114         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
115
116         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
117
118         assertEquals("isVoteGranted", false, reply.isVoteGranted());
119     }
120
121     /**
122      * This test verifies that when an AppendEntries RPC is received by a RaftActor
123      * with a commitIndex that is greater than what has been applied to the
124      * state machine of the RaftActor, the RaftActor applies the state and
125      * sets it current applied state to the commitIndex of the sender.
126      *
127      * @throws Exception
128      */
129     @Test
130     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
131         logStart("testHandleAppendEntriesWithNewerCommitIndex");
132
133         MockRaftActorContext context = createActorContext();
134
135         context.setLastApplied(100);
136         setLastLogEntry(context, 1, 100,
137                 new MockRaftActorContext.MockPayload(""));
138         context.getReplicatedLog().setSnapshotIndex(99);
139
140         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
141                 newReplicatedLogEntry(2, 101, "foo"));
142
143         // The new commitIndex is 101
144         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
145
146         follower = createBehavior(context);
147         follower.handleMessage(leaderActor, appendEntries);
148
149         assertEquals("getLastApplied", 101L, context.getLastApplied());
150     }
151
152     /**
153      * This test verifies that when an AppendEntries is received a specific prevLogTerm
154      * which does not match the term that is in RaftActors log entry at prevLogIndex
155      * then the RaftActor does not change it's state and it returns a failure.
156      *
157      * @throws Exception
158      */
159     @Test
160     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
161         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
162
163         MockRaftActorContext context = createActorContext();
164
165         // First set the receivers term to lower number
166         context.getTermInformation().update(95, "test");
167
168         // AppendEntries is now sent with a bigger term
169         // this will set the receivers term to be the same as the sender's term
170         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
171
172         follower = createBehavior(context);
173
174         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
175
176         Assert.assertSame(follower, newBehavior);
177
178         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
179                 AppendEntriesReply.class);
180
181         assertEquals("isSuccess", false, reply.isSuccess());
182     }
183
184     /**
185      * This test verifies that when a new AppendEntries message is received with
186      * new entries and the logs of the sender and receiver match that the new
187      * entries get added to the log and the log is incremented by the number of
188      * entries received in appendEntries
189      *
190      * @throws Exception
191      */
192     @Test
193     public void testHandleAppendEntriesAddNewEntries() {
194         logStart("testHandleAppendEntriesAddNewEntries");
195
196         MockRaftActorContext context = createActorContext();
197
198         // First set the receivers term to lower number
199         context.getTermInformation().update(1, "test");
200
201         // Prepare the receivers log
202         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
203         log.append(newReplicatedLogEntry(1, 0, "zero"));
204         log.append(newReplicatedLogEntry(1, 1, "one"));
205         log.append(newReplicatedLogEntry(1, 2, "two"));
206
207         context.setReplicatedLog(log);
208
209         // Prepare the entries to be sent with AppendEntries
210         List<ReplicatedLogEntry> entries = new ArrayList<>();
211         entries.add(newReplicatedLogEntry(1, 3, "three"));
212         entries.add(newReplicatedLogEntry(1, 4, "four"));
213
214         // Send appendEntries with the same term as was set on the receiver
215         // before the new behavior was created (1 in this case)
216         // This will not work for a Candidate because as soon as a Candidate
217         // is created it increments the term
218         AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
219
220         follower = createBehavior(context);
221
222         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
223
224         Assert.assertSame(follower, newBehavior);
225
226         assertEquals("Next index", 5, log.last().getIndex() + 1);
227         assertEquals("Entry 3", entries.get(0), log.get(3));
228         assertEquals("Entry 4", entries.get(1), log.get(4));
229
230         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
231     }
232
233     /**
234      * This test verifies that when a new AppendEntries message is received with
235      * new entries and the logs of the sender and receiver are out-of-sync that
236      * the log is first corrected by removing the out of sync entries from the
237      * log and then adding in the new entries sent with the AppendEntries message
238      */
239     @Test
240     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
241         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
242
243         MockRaftActorContext context = createActorContext();
244
245         // First set the receivers term to lower number
246         context.getTermInformation().update(1, "test");
247
248         // Prepare the receivers log
249         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
250         log.append(newReplicatedLogEntry(1, 0, "zero"));
251         log.append(newReplicatedLogEntry(1, 1, "one"));
252         log.append(newReplicatedLogEntry(1, 2, "two"));
253
254         context.setReplicatedLog(log);
255
256         // Prepare the entries to be sent with AppendEntries
257         List<ReplicatedLogEntry> entries = new ArrayList<>();
258         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
259         entries.add(newReplicatedLogEntry(2, 3, "three"));
260
261         // Send appendEntries with the same term as was set on the receiver
262         // before the new behavior was created (1 in this case)
263         // This will not work for a Candidate because as soon as a Candidate
264         // is created it increments the term
265         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
266
267         follower = createBehavior(context);
268
269         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
270
271         Assert.assertSame(follower, newBehavior);
272
273         // The entry at index 2 will be found out-of-sync with the leader
274         // and will be removed
275         // Then the two new entries will be added to the log
276         // Thus making the log to have 4 entries
277         assertEquals("Next index", 4, log.last().getIndex() + 1);
278         //assertEquals("Entry 2", entries.get(0), log.get(2));
279
280         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
281
282         // Check that the entry at index 2 has the new data
283         assertEquals("Entry 2", entries.get(0), log.get(2));
284
285         assertEquals("Entry 3", entries.get(1), log.get(3));
286
287         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
288     }
289
290     @Test
291     public void testHandleAppendEntriesPreviousLogEntryMissing(){
292         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
293
294         MockRaftActorContext context = createActorContext();
295
296         // Prepare the receivers log
297         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
298         log.append(newReplicatedLogEntry(1, 0, "zero"));
299         log.append(newReplicatedLogEntry(1, 1, "one"));
300         log.append(newReplicatedLogEntry(1, 2, "two"));
301
302         context.setReplicatedLog(log);
303
304         // Prepare the entries to be sent with AppendEntries
305         List<ReplicatedLogEntry> entries = new ArrayList<>();
306         entries.add(newReplicatedLogEntry(1, 4, "four"));
307
308         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
309
310         follower = createBehavior(context);
311
312         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
313
314         Assert.assertSame(follower, newBehavior);
315
316         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
317     }
318
319     @Test
320     public void testHandleAppendEntriesWithExistingLogEntry() {
321         logStart("testHandleAppendEntriesWithExistingLogEntry");
322
323         MockRaftActorContext context = createActorContext();
324
325         context.getTermInformation().update(1, "test");
326
327         // Prepare the receivers log
328         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
329         log.append(newReplicatedLogEntry(1, 0, "zero"));
330         log.append(newReplicatedLogEntry(1, 1, "one"));
331
332         context.setReplicatedLog(log);
333
334         // Send the last entry again.
335         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
336
337         follower = createBehavior(context);
338
339         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
340
341         assertEquals("Next index", 2, log.last().getIndex() + 1);
342         assertEquals("Entry 1", entries.get(0), log.get(1));
343
344         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
345
346         // Send the last entry again and also a new one.
347
348         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
349
350         leaderActor.underlyingActor().clear();
351         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
352
353         assertEquals("Next index", 3, log.last().getIndex() + 1);
354         assertEquals("Entry 1", entries.get(0), log.get(1));
355         assertEquals("Entry 2", entries.get(1), log.get(2));
356
357         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
358     }
359
360     @Test
361     public void testHandleAppendAfterInstallingSnapshot(){
362         logStart("testHandleAppendAfterInstallingSnapshot");
363
364         MockRaftActorContext context = createActorContext();
365
366         // Prepare the receivers log
367         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
368
369         // Set up a log as if it has been snapshotted
370         log.setSnapshotIndex(3);
371         log.setSnapshotTerm(1);
372
373         context.setReplicatedLog(log);
374
375         // Prepare the entries to be sent with AppendEntries
376         List<ReplicatedLogEntry> entries = new ArrayList<>();
377         entries.add(newReplicatedLogEntry(1, 4, "four"));
378
379         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
380
381         follower = createBehavior(context);
382
383         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
384
385         Assert.assertSame(follower, newBehavior);
386
387         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
388     }
389
390
391     /**
392      * This test verifies that when InstallSnapshot is received by
393      * the follower its applied correctly.
394      *
395      * @throws Exception
396      */
397     @Test
398     public void testHandleInstallSnapshot() throws Exception {
399         logStart("testHandleInstallSnapshot");
400
401         MockRaftActorContext context = createActorContext();
402
403         follower = createBehavior(context);
404
405         HashMap<String, String> followerSnapshot = new HashMap<>();
406         followerSnapshot.put("1", "A");
407         followerSnapshot.put("2", "B");
408         followerSnapshot.put("3", "C");
409
410         ByteString bsSnapshot  = toByteString(followerSnapshot);
411         int offset = 0;
412         int snapshotLength = bsSnapshot.size();
413         int chunkSize = 50;
414         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
415         int lastIncludedIndex = 1;
416         int chunkIndex = 1;
417         InstallSnapshot lastInstallSnapshot = null;
418
419         for(int i = 0; i < totalChunks; i++) {
420             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
421             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
422                     chunkData, chunkIndex, totalChunks);
423             follower.handleMessage(leaderActor, lastInstallSnapshot);
424             offset = offset + 50;
425             lastIncludedIndex++;
426             chunkIndex++;
427         }
428
429         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
430                 ApplySnapshot.class);
431         Snapshot snapshot = applySnapshot.getSnapshot();
432         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
433         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
434                 snapshot.getLastAppliedTerm());
435         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
436                 snapshot.getLastAppliedIndex());
437         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
438         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
439
440         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
441                 leaderActor, InstallSnapshotReply.class);
442         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
443
444         chunkIndex = 1;
445         for(InstallSnapshotReply reply: replies) {
446             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
447             assertEquals("getTerm", 1, reply.getTerm());
448             assertEquals("isSuccess", true, reply.isSuccess());
449             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
450         }
451
452         Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker());
453     }
454
455     @Test
456     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
457         logStart("testHandleOutOfSequenceInstallSnapshot");
458
459         MockRaftActorContext context = createActorContext();
460
461         follower = createBehavior(context);
462
463         HashMap<String, String> followerSnapshot = new HashMap<>();
464         followerSnapshot.put("1", "A");
465         followerSnapshot.put("2", "B");
466         followerSnapshot.put("3", "C");
467
468         ByteString bsSnapshot = toByteString(followerSnapshot);
469
470         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
471                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
472         follower.handleMessage(leaderActor, installSnapshot);
473
474         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
475                 InstallSnapshotReply.class);
476
477         assertEquals("isSuccess", false, reply.isSuccess());
478         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
479         assertEquals("getTerm", 1, reply.getTerm());
480         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
481
482         Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker());
483     }
484
485     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
486         int snapshotLength = bs.size();
487         int start = offset;
488         int size = chunkSize;
489         if (chunkSize > snapshotLength) {
490             size = snapshotLength;
491         } else {
492             if ((start + chunkSize) > snapshotLength) {
493                 size = snapshotLength - start;
494             }
495         }
496         return bs.substring(start, start + size);
497     }
498
499     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
500             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
501
502         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
503                 AppendEntriesReply.class);
504
505         assertEquals("isSuccess", expSuccess, reply.isSuccess());
506         assertEquals("getTerm", expTerm, reply.getTerm());
507         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
508         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
509         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
510     }
511
512     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
513         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
514                 new MockRaftActorContext.MockPayload(data));
515     }
516
517     @Override
518     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
519             ActorRef actorRef, RaftRPC rpc) throws Exception {
520         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
521
522         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
523         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
524     }
525
526     @Override
527     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
528             throws Exception {
529         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
530         assertEquals("isSuccess", true, reply.isSuccess());
531     }
532 }