Bug 1726 : Adding test-case to check Install Snapshot functionality is handled correctly
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import akka.util.Timeout;
7 import com.google.protobuf.ByteString;
8 import junit.framework.Assert;
9 import org.junit.Test;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
12 import org.opendaylight.controller.cluster.raft.RaftActorContext;
13 import org.opendaylight.controller.cluster.raft.RaftState;
14 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
15 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
16 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
18 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
19 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
20 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
21 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
23 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
24 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
25 import scala.concurrent.Await;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.Duration;
28 import scala.concurrent.duration.FiniteDuration;
29
30 import java.io.ByteArrayOutputStream;
31 import java.io.IOException;
32 import java.io.ObjectOutputStream;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.TimeUnit;
39
40 import static akka.pattern.Patterns.ask;
41 import static org.junit.Assert.assertEquals;
42 import static org.junit.Assert.assertNotNull;
43 import static org.junit.Assert.assertTrue;
44
45 public class FollowerTest extends AbstractRaftActorBehaviorTest {
46
47     private final ActorRef followerActor = getSystem().actorOf(Props.create(
48         DoNothingActor.class));
49
50
51     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
52         return new Follower(actorContext);
53     }
54
55     @Override protected  RaftActorContext createActorContext() {
56         return createActorContext(followerActor);
57     }
58
59     protected  RaftActorContext createActorContext(ActorRef actorRef){
60         return new MockRaftActorContext("test", getSystem(), actorRef);
61     }
62
63     @Test
64     public void testThatAnElectionTimeoutIsTriggered(){
65         new JavaTestKit(getSystem()) {{
66
67             new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
68                 protected void run() {
69
70                     Follower follower = new Follower(createActorContext(getTestActor()));
71
72                     final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
73                         // do not put code outside this method, will run afterwards
74                         protected Boolean match(Object in) {
75                             if (in instanceof ElectionTimeout) {
76                                 return true;
77                             } else {
78                                 throw noMatch();
79                             }
80                         }
81                     }.get();
82
83                     assertEquals(true, out);
84                 }
85             };
86         }};
87     }
88
89     @Test
90     public void testHandleElectionTimeout(){
91         RaftActorContext raftActorContext = createActorContext();
92         Follower follower =
93             new Follower(raftActorContext);
94
95         RaftState raftState =
96             follower.handleMessage(followerActor, new ElectionTimeout());
97
98         Assert.assertEquals(RaftState.Candidate, raftState);
99     }
100
101     @Test
102     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
103         new JavaTestKit(getSystem()) {{
104
105             new Within(duration("1 seconds")) {
106                 protected void run() {
107
108                     RaftActorContext context = createActorContext(getTestActor());
109
110                     context.getTermInformation().update(1000, null);
111
112                     RaftActorBehavior follower = createBehavior(context);
113
114                     follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
115
116                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
117                         // do not put code outside this method, will run afterwards
118                         protected Boolean match(Object in) {
119                             if (in instanceof RequestVoteReply) {
120                                 RequestVoteReply reply = (RequestVoteReply) in;
121                                 return reply.isVoteGranted();
122                             } else {
123                                 throw noMatch();
124                             }
125                         }
126                     }.get();
127
128                     assertEquals(true, out);
129                 }
130             };
131         }};
132     }
133
134     @Test
135     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
136         new JavaTestKit(getSystem()) {{
137
138             new Within(duration("1 seconds")) {
139                 protected void run() {
140
141                     RaftActorContext context = createActorContext(getTestActor());
142
143                     context.getTermInformation().update(1000, "test");
144
145                     RaftActorBehavior follower = createBehavior(context);
146
147                     follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
148
149                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
150                         // do not put code outside this method, will run afterwards
151                         protected Boolean match(Object in) {
152                             if (in instanceof RequestVoteReply) {
153                                 RequestVoteReply reply = (RequestVoteReply) in;
154                                 return reply.isVoteGranted();
155                             } else {
156                                 throw noMatch();
157                             }
158                         }
159                     }.get();
160
161                     assertEquals(false, out);
162                 }
163             };
164         }};
165     }
166
167     /**
168      * This test verifies that when an AppendEntries RPC is received by a RaftActor
169      * with a commitIndex that is greater than what has been applied to the
170      * state machine of the RaftActor, the RaftActor applies the state and
171      * sets it current applied state to the commitIndex of the sender.
172      *
173      * @throws Exception
174      */
175     @Test
176     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
177         new JavaTestKit(getSystem()) {{
178
179             RaftActorContext context =
180                 createActorContext();
181
182             context.setLastApplied(100);
183             setLastLogEntry((MockRaftActorContext) context, 1, 100,
184                 new MockRaftActorContext.MockPayload(""));
185             ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
186
187             List<ReplicatedLogEntry> entries =
188                 Arrays.asList(
189                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
190                                 new MockRaftActorContext.MockPayload("foo"))
191                 );
192
193             // The new commitIndex is 101
194             AppendEntries appendEntries =
195                 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
196
197             RaftState raftState =
198                 createBehavior(context).handleMessage(getRef(), appendEntries);
199
200             assertEquals(101L, context.getLastApplied());
201
202         }};
203     }
204
205     /**
206      * This test verifies that when an AppendEntries is received a specific prevLogTerm
207      * which does not match the term that is in RaftActors log entry at prevLogIndex
208      * then the RaftActor does not change it's state and it returns a failure.
209      *
210      * @throws Exception
211      */
212     @Test
213     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
214         throws Exception {
215         new JavaTestKit(getSystem()) {{
216
217             MockRaftActorContext context = (MockRaftActorContext)
218                 createActorContext();
219
220             // First set the receivers term to lower number
221             context.getTermInformation().update(95, "test");
222
223             // Set the last log entry term for the receiver to be greater than
224             // what we will be sending as the prevLogTerm in AppendEntries
225             MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
226                 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
227
228             // AppendEntries is now sent with a bigger term
229             // this will set the receivers term to be the same as the sender's term
230             AppendEntries appendEntries =
231                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
232
233             RaftActorBehavior behavior = createBehavior(context);
234
235             // Send an unknown message so that the state of the RaftActor remains unchanged
236             RaftState expected = behavior.handleMessage(getRef(), "unknown");
237
238             RaftState raftState =
239                 behavior.handleMessage(getRef(), appendEntries);
240
241             assertEquals(expected, raftState);
242
243             // Also expect an AppendEntriesReply to be sent where success is false
244             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
245                 "AppendEntriesReply") {
246                 // do not put code outside this method, will run afterwards
247                 protected Boolean match(Object in) {
248                     if (in instanceof AppendEntriesReply) {
249                         AppendEntriesReply reply = (AppendEntriesReply) in;
250                         return reply.isSuccess();
251                     } else {
252                         throw noMatch();
253                     }
254                 }
255             }.get();
256
257             assertEquals(false, out);
258
259
260         }};
261     }
262
263
264
265     /**
266      * This test verifies that when a new AppendEntries message is received with
267      * new entries and the logs of the sender and receiver match that the new
268      * entries get added to the log and the log is incremented by the number of
269      * entries received in appendEntries
270      *
271      * @throws Exception
272      */
273     @Test
274     public void testHandleAppendEntriesAddNewEntries() throws Exception {
275         new JavaTestKit(getSystem()) {{
276
277             MockRaftActorContext context = (MockRaftActorContext)
278                 createActorContext();
279
280             // First set the receivers term to lower number
281             context.getTermInformation().update(1, "test");
282
283             // Prepare the receivers log
284             MockRaftActorContext.SimpleReplicatedLog log =
285                 new MockRaftActorContext.SimpleReplicatedLog();
286             log.append(
287                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
288             log.append(
289                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
290             log.append(
291                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
292
293             context.setReplicatedLog(log);
294
295             // Prepare the entries to be sent with AppendEntries
296             List<ReplicatedLogEntry> entries = new ArrayList<>();
297             entries.add(
298                 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
299             entries.add(
300                 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
301
302             // Send appendEntries with the same term as was set on the receiver
303             // before the new behavior was created (1 in this case)
304             // This will not work for a Candidate because as soon as a Candidate
305             // is created it increments the term
306             AppendEntries appendEntries =
307                 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
308
309             RaftActorBehavior behavior = createBehavior(context);
310
311             // Send an unknown message so that the state of the RaftActor remains unchanged
312             RaftState expected = behavior.handleMessage(getRef(), "unknown");
313
314             RaftState raftState =
315                 behavior.handleMessage(getRef(), appendEntries);
316
317             assertEquals(expected, raftState);
318             assertEquals(5, log.last().getIndex() + 1);
319             assertNotNull(log.get(3));
320             assertNotNull(log.get(4));
321
322             // Also expect an AppendEntriesReply to be sent where success is false
323             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
324                 "AppendEntriesReply") {
325                 // do not put code outside this method, will run afterwards
326                 protected Boolean match(Object in) {
327                     if (in instanceof AppendEntriesReply) {
328                         AppendEntriesReply reply = (AppendEntriesReply) in;
329                         return reply.isSuccess();
330                     } else {
331                         throw noMatch();
332                     }
333                 }
334             }.get();
335
336             assertEquals(true, out);
337
338
339         }};
340     }
341
342
343
344     /**
345      * This test verifies that when a new AppendEntries message is received with
346      * new entries and the logs of the sender and receiver are out-of-sync that
347      * the log is first corrected by removing the out of sync entries from the
348      * log and then adding in the new entries sent with the AppendEntries message
349      *
350      * @throws Exception
351      */
352     @Test
353     public void testHandleAppendEntriesCorrectReceiverLogEntries()
354         throws Exception {
355         new JavaTestKit(getSystem()) {{
356
357             MockRaftActorContext context = (MockRaftActorContext)
358                 createActorContext();
359
360             // First set the receivers term to lower number
361             context.getTermInformation().update(2, "test");
362
363             // Prepare the receivers log
364             MockRaftActorContext.SimpleReplicatedLog log =
365                 new MockRaftActorContext.SimpleReplicatedLog();
366             log.append(
367                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
368             log.append(
369                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
370             log.append(
371                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
372
373             context.setReplicatedLog(log);
374
375             // Prepare the entries to be sent with AppendEntries
376             List<ReplicatedLogEntry> entries = new ArrayList<>();
377             entries.add(
378                 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
379             entries.add(
380                 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
381
382             // Send appendEntries with the same term as was set on the receiver
383             // before the new behavior was created (1 in this case)
384             // This will not work for a Candidate because as soon as a Candidate
385             // is created it increments the term
386             AppendEntries appendEntries =
387                 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
388
389             RaftActorBehavior behavior = createBehavior(context);
390
391             // Send an unknown message so that the state of the RaftActor remains unchanged
392             RaftState expected = behavior.handleMessage(getRef(), "unknown");
393
394             RaftState raftState =
395                 behavior.handleMessage(getRef(), appendEntries);
396
397             assertEquals(expected, raftState);
398
399             // The entry at index 2 will be found out-of-sync with the leader
400             // and will be removed
401             // Then the two new entries will be added to the log
402             // Thus making the log to have 4 entries
403             assertEquals(4, log.last().getIndex() + 1);
404             assertNotNull(log.get(2));
405
406             assertEquals("one", log.get(1).getData().toString());
407
408             // Check that the entry at index 2 has the new data
409             assertEquals("two-1", log.get(2).getData().toString());
410
411             assertEquals("three", log.get(3).getData().toString());
412
413             assertNotNull(log.get(3));
414
415             // Also expect an AppendEntriesReply to be sent where success is false
416             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
417                 "AppendEntriesReply") {
418                 // do not put code outside this method, will run afterwards
419                 protected Boolean match(Object in) {
420                     if (in instanceof AppendEntriesReply) {
421                         AppendEntriesReply reply = (AppendEntriesReply) in;
422                         return reply.isSuccess();
423                     } else {
424                         throw noMatch();
425                     }
426                 }
427             }.get();
428
429             assertEquals(true, out);
430
431
432         }};
433     }
434
435
436     /**
437      * This test verifies that when InstallSnapshot is received by
438      * the follower its applied correctly.
439      *
440      * @throws Exception
441      */
442     @Test
443     public void testHandleInstallSnapshot() throws Exception {
444         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
445
446             ActorRef leaderActor = getSystem().actorOf(Props.create(
447                 MessageCollectorActor.class));
448
449             MockRaftActorContext context = (MockRaftActorContext)
450                 createActorContext(getRef());
451
452             Follower follower = (Follower)createBehavior(context);
453
454             HashMap<String, String> followerSnapshot = new HashMap<>();
455             followerSnapshot.put("1", "A");
456             followerSnapshot.put("2", "B");
457             followerSnapshot.put("3", "C");
458
459             ByteString bsSnapshot  = toByteString(followerSnapshot);
460             ByteString chunkData = ByteString.EMPTY;
461             int offset = 0;
462             int snapshotLength = bsSnapshot.size();
463             int i = 1;
464
465             do {
466                 chunkData = getNextChunk(bsSnapshot, offset);
467                 final InstallSnapshot installSnapshot =
468                     new InstallSnapshot(1, "leader-1", i, 1,
469                         chunkData, i, 3);
470                 follower.handleMessage(leaderActor, installSnapshot);
471                 offset = offset + 50;
472                 i++;
473             } while ((offset+50) < snapshotLength);
474
475             final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
476             follower.handleMessage(leaderActor, installSnapshot3);
477
478             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
479                 @Override
480                 protected String match(Object o) throws Exception {
481                     if (o instanceof ApplySnapshot) {
482                         ApplySnapshot as = (ApplySnapshot)o;
483                         if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
484                             return "applySnapshot-lastIndex-mismatch";
485                         }
486                         if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
487                             return "applySnapshot-lastAppliedTerm-mismatch";
488                         }
489                         if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
490                             return "applySnapshot-lastAppliedIndex-mismatch";
491                         }
492                         if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
493                             return "applySnapshot-lastTerm-mismatch";
494                         }
495                         return "applySnapshot";
496                     }
497
498                     return "ignoreCase";
499                 }
500             }.get();
501
502             String applySnapshotMatch = "";
503             for (String reply: matches) {
504                 if (reply.startsWith("applySnapshot")) {
505                     applySnapshotMatch = reply;
506                 }
507             }
508
509             assertEquals("applySnapshot", applySnapshotMatch);
510
511             Object messages = executeLocalOperation(leaderActor, "get-all-messages");
512
513             assertNotNull(messages);
514             assertTrue(messages instanceof List);
515             List<Object> listMessages = (List<Object>) messages;
516
517             int installSnapshotReplyReceivedCount = 0;
518             for (Object message: listMessages) {
519                 if (message instanceof InstallSnapshotReply) {
520                     ++installSnapshotReplyReceivedCount;
521                 }
522             }
523
524             assertEquals(3, installSnapshotReplyReceivedCount);
525
526         }};
527     }
528
529     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
530         FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
531         Timeout operationTimeout = new Timeout(operationDuration);
532         Future<Object> future = ask(actor, message, operationTimeout);
533
534         try {
535             return Await.result(future, operationDuration);
536         } catch (Exception e) {
537             throw e;
538         }
539     }
540
541     public ByteString getNextChunk (ByteString bs, int offset){
542         int snapshotLength = bs.size();
543         int start = offset;
544         int size = 50;
545         if (50 > snapshotLength) {
546             size = snapshotLength;
547         } else {
548             if ((start + 50) > snapshotLength) {
549                 size = snapshotLength - start;
550             }
551         }
552         return bs.substring(start, start + size);
553     }
554
555     private ByteString toByteString(Map<String, String> state) {
556         ByteArrayOutputStream b = null;
557         ObjectOutputStream o = null;
558         try {
559             try {
560                 b = new ByteArrayOutputStream();
561                 o = new ObjectOutputStream(b);
562                 o.writeObject(state);
563                 byte[] snapshotBytes = b.toByteArray();
564                 return ByteString.copyFrom(snapshotBytes);
565             } finally {
566                 if (o != null) {
567                     o.flush();
568                     o.close();
569                 }
570                 if (b != null) {
571                     b.close();
572                 }
573             }
574         } catch (IOException e) {
575             org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
576         }
577         return null;
578     }
579 }