Merge "Bug-1978:Leader initializes commit index to last index"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import com.google.protobuf.ByteString;
7 import junit.framework.Assert;
8 import org.junit.Test;
9 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
10 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
11 import org.opendaylight.controller.cluster.raft.RaftActorContext;
12 import org.opendaylight.controller.cluster.raft.RaftState;
13 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
14 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
15 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
16 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
18 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
19 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
21 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
22 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
23 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
24
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.ObjectOutputStream;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33
34 import static org.junit.Assert.assertEquals;
35 import static org.junit.Assert.assertNotNull;
36 import static org.junit.Assert.assertTrue;
37
38 public class FollowerTest extends AbstractRaftActorBehaviorTest {
39
40     private final ActorRef followerActor = getSystem().actorOf(Props.create(
41         DoNothingActor.class));
42
43
44     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
45         return new Follower(actorContext);
46     }
47
48     @Override protected  RaftActorContext createActorContext() {
49         return createActorContext(followerActor);
50     }
51
52     protected  RaftActorContext createActorContext(ActorRef actorRef){
53         return new MockRaftActorContext("test", getSystem(), actorRef);
54     }
55
56     @Test
57     public void testThatAnElectionTimeoutIsTriggered(){
58         new JavaTestKit(getSystem()) {{
59
60             new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
61                 protected void run() {
62
63                     Follower follower = new Follower(createActorContext(getTestActor()));
64
65                     final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
66                         // do not put code outside this method, will run afterwards
67                         protected Boolean match(Object in) {
68                             if (in instanceof ElectionTimeout) {
69                                 return true;
70                             } else {
71                                 throw noMatch();
72                             }
73                         }
74                     }.get();
75
76                     assertEquals(true, out);
77                 }
78             };
79         }};
80     }
81
82     @Test
83     public void testHandleElectionTimeout(){
84         RaftActorContext raftActorContext = createActorContext();
85         Follower follower =
86             new Follower(raftActorContext);
87
88         RaftState raftState =
89             follower.handleMessage(followerActor, new ElectionTimeout());
90
91         Assert.assertEquals(RaftState.Candidate, raftState);
92     }
93
94     @Test
95     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
96         new JavaTestKit(getSystem()) {{
97
98             new Within(duration("1 seconds")) {
99                 protected void run() {
100
101                     RaftActorContext context = createActorContext(getTestActor());
102
103                     context.getTermInformation().update(1000, null);
104
105                     RaftActorBehavior follower = createBehavior(context);
106
107                     follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
108
109                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
110                         // do not put code outside this method, will run afterwards
111                         protected Boolean match(Object in) {
112                             if (in instanceof RequestVoteReply) {
113                                 RequestVoteReply reply = (RequestVoteReply) in;
114                                 return reply.isVoteGranted();
115                             } else {
116                                 throw noMatch();
117                             }
118                         }
119                     }.get();
120
121                     assertEquals(true, out);
122                 }
123             };
124         }};
125     }
126
127     @Test
128     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
129         new JavaTestKit(getSystem()) {{
130
131             new Within(duration("1 seconds")) {
132                 protected void run() {
133
134                     RaftActorContext context = createActorContext(getTestActor());
135
136                     context.getTermInformation().update(1000, "test");
137
138                     RaftActorBehavior follower = createBehavior(context);
139
140                     follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
141
142                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
143                         // do not put code outside this method, will run afterwards
144                         protected Boolean match(Object in) {
145                             if (in instanceof RequestVoteReply) {
146                                 RequestVoteReply reply = (RequestVoteReply) in;
147                                 return reply.isVoteGranted();
148                             } else {
149                                 throw noMatch();
150                             }
151                         }
152                     }.get();
153
154                     assertEquals(false, out);
155                 }
156             };
157         }};
158     }
159
160     /**
161      * This test verifies that when an AppendEntries RPC is received by a RaftActor
162      * with a commitIndex that is greater than what has been applied to the
163      * state machine of the RaftActor, the RaftActor applies the state and
164      * sets it current applied state to the commitIndex of the sender.
165      *
166      * @throws Exception
167      */
168     @Test
169     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
170         new JavaTestKit(getSystem()) {{
171
172             RaftActorContext context =
173                 createActorContext();
174
175             context.setLastApplied(100);
176             setLastLogEntry((MockRaftActorContext) context, 1, 100,
177                 new MockRaftActorContext.MockPayload(""));
178             ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
179
180             List<ReplicatedLogEntry> entries =
181                 Arrays.asList(
182                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
183                                 new MockRaftActorContext.MockPayload("foo"))
184                 );
185
186             // The new commitIndex is 101
187             AppendEntries appendEntries =
188                 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
189
190             RaftState raftState =
191                 createBehavior(context).handleMessage(getRef(), appendEntries);
192
193             assertEquals(101L, context.getLastApplied());
194
195         }};
196     }
197
198     /**
199      * This test verifies that when an AppendEntries is received a specific prevLogTerm
200      * which does not match the term that is in RaftActors log entry at prevLogIndex
201      * then the RaftActor does not change it's state and it returns a failure.
202      *
203      * @throws Exception
204      */
205     @Test
206     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
207         throws Exception {
208         new JavaTestKit(getSystem()) {{
209
210             MockRaftActorContext context = (MockRaftActorContext)
211                 createActorContext();
212
213             // First set the receivers term to lower number
214             context.getTermInformation().update(95, "test");
215
216             // Set the last log entry term for the receiver to be greater than
217             // what we will be sending as the prevLogTerm in AppendEntries
218             MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
219                 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
220
221             // AppendEntries is now sent with a bigger term
222             // this will set the receivers term to be the same as the sender's term
223             AppendEntries appendEntries =
224                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
225
226             RaftActorBehavior behavior = createBehavior(context);
227
228             // Send an unknown message so that the state of the RaftActor remains unchanged
229             RaftState expected = behavior.handleMessage(getRef(), "unknown");
230
231             RaftState raftState =
232                 behavior.handleMessage(getRef(), appendEntries);
233
234             assertEquals(expected, raftState);
235
236             // Also expect an AppendEntriesReply to be sent where success is false
237             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
238                 "AppendEntriesReply") {
239                 // do not put code outside this method, will run afterwards
240                 protected Boolean match(Object in) {
241                     if (in instanceof AppendEntriesReply) {
242                         AppendEntriesReply reply = (AppendEntriesReply) in;
243                         return reply.isSuccess();
244                     } else {
245                         throw noMatch();
246                     }
247                 }
248             }.get();
249
250             assertEquals(false, out);
251
252
253         }};
254     }
255
256
257
258     /**
259      * This test verifies that when a new AppendEntries message is received with
260      * new entries and the logs of the sender and receiver match that the new
261      * entries get added to the log and the log is incremented by the number of
262      * entries received in appendEntries
263      *
264      * @throws Exception
265      */
266     @Test
267     public void testHandleAppendEntriesAddNewEntries() throws Exception {
268         new JavaTestKit(getSystem()) {{
269
270             MockRaftActorContext context = (MockRaftActorContext)
271                 createActorContext();
272
273             // First set the receivers term to lower number
274             context.getTermInformation().update(1, "test");
275
276             // Prepare the receivers log
277             MockRaftActorContext.SimpleReplicatedLog log =
278                 new MockRaftActorContext.SimpleReplicatedLog();
279             log.append(
280                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
281             log.append(
282                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
283             log.append(
284                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
285
286             context.setReplicatedLog(log);
287
288             // Prepare the entries to be sent with AppendEntries
289             List<ReplicatedLogEntry> entries = new ArrayList<>();
290             entries.add(
291                 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
292             entries.add(
293                 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
294
295             // Send appendEntries with the same term as was set on the receiver
296             // before the new behavior was created (1 in this case)
297             // This will not work for a Candidate because as soon as a Candidate
298             // is created it increments the term
299             AppendEntries appendEntries =
300                 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
301
302             RaftActorBehavior behavior = createBehavior(context);
303
304             // Send an unknown message so that the state of the RaftActor remains unchanged
305             RaftState expected = behavior.handleMessage(getRef(), "unknown");
306
307             RaftState raftState =
308                 behavior.handleMessage(getRef(), appendEntries);
309
310             assertEquals(expected, raftState);
311             assertEquals(5, log.last().getIndex() + 1);
312             assertNotNull(log.get(3));
313             assertNotNull(log.get(4));
314
315             // Also expect an AppendEntriesReply to be sent where success is false
316             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
317                 "AppendEntriesReply") {
318                 // do not put code outside this method, will run afterwards
319                 protected Boolean match(Object in) {
320                     if (in instanceof AppendEntriesReply) {
321                         AppendEntriesReply reply = (AppendEntriesReply) in;
322                         return reply.isSuccess();
323                     } else {
324                         throw noMatch();
325                     }
326                 }
327             }.get();
328
329             assertEquals(true, out);
330
331
332         }};
333     }
334
335
336
337     /**
338      * This test verifies that when a new AppendEntries message is received with
339      * new entries and the logs of the sender and receiver are out-of-sync that
340      * the log is first corrected by removing the out of sync entries from the
341      * log and then adding in the new entries sent with the AppendEntries message
342      *
343      * @throws Exception
344      */
345     @Test
346     public void testHandleAppendEntriesCorrectReceiverLogEntries()
347         throws Exception {
348         new JavaTestKit(getSystem()) {{
349
350             MockRaftActorContext context = (MockRaftActorContext)
351                 createActorContext();
352
353             // First set the receivers term to lower number
354             context.getTermInformation().update(2, "test");
355
356             // Prepare the receivers log
357             MockRaftActorContext.SimpleReplicatedLog log =
358                 new MockRaftActorContext.SimpleReplicatedLog();
359             log.append(
360                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
361             log.append(
362                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
363             log.append(
364                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
365
366             context.setReplicatedLog(log);
367
368             // Prepare the entries to be sent with AppendEntries
369             List<ReplicatedLogEntry> entries = new ArrayList<>();
370             entries.add(
371                 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
372             entries.add(
373                 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
374
375             // Send appendEntries with the same term as was set on the receiver
376             // before the new behavior was created (1 in this case)
377             // This will not work for a Candidate because as soon as a Candidate
378             // is created it increments the term
379             AppendEntries appendEntries =
380                 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
381
382             RaftActorBehavior behavior = createBehavior(context);
383
384             // Send an unknown message so that the state of the RaftActor remains unchanged
385             RaftState expected = behavior.handleMessage(getRef(), "unknown");
386
387             RaftState raftState =
388                 behavior.handleMessage(getRef(), appendEntries);
389
390             assertEquals(expected, raftState);
391
392             // The entry at index 2 will be found out-of-sync with the leader
393             // and will be removed
394             // Then the two new entries will be added to the log
395             // Thus making the log to have 4 entries
396             assertEquals(4, log.last().getIndex() + 1);
397             assertNotNull(log.get(2));
398
399             assertEquals("one", log.get(1).getData().toString());
400
401             // Check that the entry at index 2 has the new data
402             assertEquals("two-1", log.get(2).getData().toString());
403
404             assertEquals("three", log.get(3).getData().toString());
405
406             assertNotNull(log.get(3));
407
408             // Also expect an AppendEntriesReply to be sent where success is false
409             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
410                 "AppendEntriesReply") {
411                 // do not put code outside this method, will run afterwards
412                 protected Boolean match(Object in) {
413                     if (in instanceof AppendEntriesReply) {
414                         AppendEntriesReply reply = (AppendEntriesReply) in;
415                         return reply.isSuccess();
416                     } else {
417                         throw noMatch();
418                     }
419                 }
420             }.get();
421
422             assertEquals(true, out);
423
424
425         }};
426     }
427
428
429     /**
430      * This test verifies that when InstallSnapshot is received by
431      * the follower its applied correctly.
432      *
433      * @throws Exception
434      */
435     @Test
436     public void testHandleInstallSnapshot() throws Exception {
437         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
438
439             ActorRef leaderActor = getSystem().actorOf(Props.create(
440                 MessageCollectorActor.class));
441
442             MockRaftActorContext context = (MockRaftActorContext)
443                 createActorContext(getRef());
444
445             Follower follower = (Follower)createBehavior(context);
446
447             HashMap<String, String> followerSnapshot = new HashMap<>();
448             followerSnapshot.put("1", "A");
449             followerSnapshot.put("2", "B");
450             followerSnapshot.put("3", "C");
451
452             ByteString bsSnapshot  = toByteString(followerSnapshot);
453             ByteString chunkData = ByteString.EMPTY;
454             int offset = 0;
455             int snapshotLength = bsSnapshot.size();
456             int i = 1;
457
458             do {
459                 chunkData = getNextChunk(bsSnapshot, offset);
460                 final InstallSnapshot installSnapshot =
461                     new InstallSnapshot(1, "leader-1", i, 1,
462                         chunkData, i, 3);
463                 follower.handleMessage(leaderActor, installSnapshot);
464                 offset = offset + 50;
465                 i++;
466             } while ((offset+50) < snapshotLength);
467
468             final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
469             follower.handleMessage(leaderActor, installSnapshot3);
470
471             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
472                 @Override
473                 protected String match(Object o) throws Exception {
474                     if (o instanceof ApplySnapshot) {
475                         ApplySnapshot as = (ApplySnapshot)o;
476                         if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
477                             return "applySnapshot-lastIndex-mismatch";
478                         }
479                         if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
480                             return "applySnapshot-lastAppliedTerm-mismatch";
481                         }
482                         if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
483                             return "applySnapshot-lastAppliedIndex-mismatch";
484                         }
485                         if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
486                             return "applySnapshot-lastTerm-mismatch";
487                         }
488                         return "applySnapshot";
489                     }
490
491                     return "ignoreCase";
492                 }
493             }.get();
494
495             String applySnapshotMatch = "";
496             for (String reply: matches) {
497                 if (reply.startsWith("applySnapshot")) {
498                     applySnapshotMatch = reply;
499                 }
500             }
501
502             assertEquals("applySnapshot", applySnapshotMatch);
503
504             Object messages = executeLocalOperation(leaderActor, "get-all-messages");
505
506             assertNotNull(messages);
507             assertTrue(messages instanceof List);
508             List<Object> listMessages = (List<Object>) messages;
509
510             int installSnapshotReplyReceivedCount = 0;
511             for (Object message: listMessages) {
512                 if (message instanceof InstallSnapshotReply) {
513                     ++installSnapshotReplyReceivedCount;
514                 }
515             }
516
517             assertEquals(3, installSnapshotReplyReceivedCount);
518
519         }};
520     }
521
522     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
523         return MessageCollectorActor.getAllMessages(actor);
524     }
525
526     public ByteString getNextChunk (ByteString bs, int offset){
527         int snapshotLength = bs.size();
528         int start = offset;
529         int size = 50;
530         if (50 > snapshotLength) {
531             size = snapshotLength;
532         } else {
533             if ((start + 50) > snapshotLength) {
534                 size = snapshotLength - start;
535             }
536         }
537         return bs.substring(start, start + size);
538     }
539
540     private ByteString toByteString(Map<String, String> state) {
541         ByteArrayOutputStream b = null;
542         ObjectOutputStream o = null;
543         try {
544             try {
545                 b = new ByteArrayOutputStream();
546                 o = new ObjectOutputStream(b);
547                 o.writeObject(state);
548                 byte[] snapshotBytes = b.toByteArray();
549                 return ByteString.copyFrom(snapshotBytes);
550             } finally {
551                 if (o != null) {
552                     o.flush();
553                     o.close();
554                 }
555                 if (b != null) {
556                     b.close();
557                 }
558             }
559         } catch (IOException e) {
560             org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
561         }
562         return null;
563     }
564 }