Merge "BUG 2464 : Shard dataSize does not seem to correspond to actual memory usage"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.Props;
8 import akka.testkit.JavaTestKit;
9 import com.google.protobuf.ByteString;
10 import java.io.ByteArrayOutputStream;
11 import java.io.IOException;
12 import java.io.ObjectOutputStream;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
24 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33
34 public class FollowerTest extends AbstractRaftActorBehaviorTest {
35
36     private final ActorRef followerActor = getSystem().actorOf(Props.create(
37         DoNothingActor.class));
38
39
40     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
41         return new Follower(actorContext);
42     }
43
44     @Override protected  RaftActorContext createActorContext() {
45         return createActorContext(followerActor);
46     }
47
48     protected  RaftActorContext createActorContext(ActorRef actorRef){
49         return new MockRaftActorContext("test", getSystem(), actorRef);
50     }
51
52     @Test
53     public void testThatAnElectionTimeoutIsTriggered(){
54         new JavaTestKit(getSystem()) {{
55
56             new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
57                 protected void run() {
58
59                     Follower follower = new Follower(createActorContext(getTestActor()));
60
61                     final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
62                         // do not put code outside this method, will run afterwards
63                         protected Boolean match(Object in) {
64                             if (in instanceof ElectionTimeout) {
65                                 return true;
66                             } else {
67                                 throw noMatch();
68                             }
69                         }
70                     }.get();
71
72                     assertEquals(true, out);
73                 }
74             };
75         }};
76     }
77
78     @Test
79     public void testHandleElectionTimeout(){
80         RaftActorContext raftActorContext = createActorContext();
81         Follower follower =
82             new Follower(raftActorContext);
83
84         RaftActorBehavior raftBehavior =
85             follower.handleMessage(followerActor, new ElectionTimeout());
86
87         assertTrue(raftBehavior instanceof Candidate);
88     }
89
90     @Test
91     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
92         new JavaTestKit(getSystem()) {{
93
94             new Within(duration("1 seconds")) {
95                 protected void run() {
96
97                     RaftActorContext context = createActorContext(getTestActor());
98
99                     context.getTermInformation().update(1000, null);
100
101                     RaftActorBehavior follower = createBehavior(context);
102
103                     follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
104
105                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
106                         // do not put code outside this method, will run afterwards
107                         protected Boolean match(Object in) {
108                             if (in instanceof RequestVoteReply) {
109                                 RequestVoteReply reply = (RequestVoteReply) in;
110                                 return reply.isVoteGranted();
111                             } else {
112                                 throw noMatch();
113                             }
114                         }
115                     }.get();
116
117                     assertEquals(true, out);
118                 }
119             };
120         }};
121     }
122
123     @Test
124     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
125         new JavaTestKit(getSystem()) {{
126
127             new Within(duration("1 seconds")) {
128                 protected void run() {
129
130                     RaftActorContext context = createActorContext(getTestActor());
131
132                     context.getTermInformation().update(1000, "test");
133
134                     RaftActorBehavior follower = createBehavior(context);
135
136                     follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
137
138                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
139                         // do not put code outside this method, will run afterwards
140                         protected Boolean match(Object in) {
141                             if (in instanceof RequestVoteReply) {
142                                 RequestVoteReply reply = (RequestVoteReply) in;
143                                 return reply.isVoteGranted();
144                             } else {
145                                 throw noMatch();
146                             }
147                         }
148                     }.get();
149
150                     assertEquals(false, out);
151                 }
152             };
153         }};
154     }
155
156     /**
157      * This test verifies that when an AppendEntries RPC is received by a RaftActor
158      * with a commitIndex that is greater than what has been applied to the
159      * state machine of the RaftActor, the RaftActor applies the state and
160      * sets it current applied state to the commitIndex of the sender.
161      *
162      * @throws Exception
163      */
164     @Test
165     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
166         new JavaTestKit(getSystem()) {{
167
168             RaftActorContext context =
169                 createActorContext();
170
171             context.setLastApplied(100);
172             setLastLogEntry((MockRaftActorContext) context, 1, 100,
173                 new MockRaftActorContext.MockPayload(""));
174             ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
175
176             List<ReplicatedLogEntry> entries =
177                 Arrays.asList(
178                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
179                                 new MockRaftActorContext.MockPayload("foo"))
180                 );
181
182             // The new commitIndex is 101
183             AppendEntries appendEntries =
184                 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
185
186             RaftActorBehavior raftBehavior =
187                 createBehavior(context).handleMessage(getRef(), appendEntries);
188
189             assertEquals(101L, context.getLastApplied());
190
191         }};
192     }
193
194     /**
195      * This test verifies that when an AppendEntries is received a specific prevLogTerm
196      * which does not match the term that is in RaftActors log entry at prevLogIndex
197      * then the RaftActor does not change it's state and it returns a failure.
198      *
199      * @throws Exception
200      */
201     @Test
202     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
203         throws Exception {
204         new JavaTestKit(getSystem()) {{
205
206             MockRaftActorContext context = (MockRaftActorContext)
207                 createActorContext();
208
209             // First set the receivers term to lower number
210             context.getTermInformation().update(95, "test");
211
212             // Set the last log entry term for the receiver to be greater than
213             // what we will be sending as the prevLogTerm in AppendEntries
214             MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
215                 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
216
217             // AppendEntries is now sent with a bigger term
218             // this will set the receivers term to be the same as the sender's term
219             AppendEntries appendEntries =
220                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
221
222             RaftActorBehavior behavior = createBehavior(context);
223
224             // Send an unknown message so that the state of the RaftActor remains unchanged
225             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
226
227             RaftActorBehavior raftBehavior =
228                 behavior.handleMessage(getRef(), appendEntries);
229
230             assertEquals(expected, raftBehavior);
231
232             // Also expect an AppendEntriesReply to be sent where success is false
233             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
234                 "AppendEntriesReply") {
235                 // do not put code outside this method, will run afterwards
236                 protected Boolean match(Object in) {
237                     if (in instanceof AppendEntriesReply) {
238                         AppendEntriesReply reply = (AppendEntriesReply) in;
239                         return reply.isSuccess();
240                     } else {
241                         throw noMatch();
242                     }
243                 }
244             }.get();
245
246             assertEquals(false, out);
247
248
249         }};
250     }
251
252
253
254     /**
255      * This test verifies that when a new AppendEntries message is received with
256      * new entries and the logs of the sender and receiver match that the new
257      * entries get added to the log and the log is incremented by the number of
258      * entries received in appendEntries
259      *
260      * @throws Exception
261      */
262     @Test
263     public void testHandleAppendEntriesAddNewEntries() throws Exception {
264         new JavaTestKit(getSystem()) {{
265
266             MockRaftActorContext context = (MockRaftActorContext)
267                 createActorContext();
268
269             // First set the receivers term to lower number
270             context.getTermInformation().update(1, "test");
271
272             // Prepare the receivers log
273             MockRaftActorContext.SimpleReplicatedLog log =
274                 new MockRaftActorContext.SimpleReplicatedLog();
275             log.append(
276                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
277             log.append(
278                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
279             log.append(
280                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
281
282             context.setReplicatedLog(log);
283
284             // Prepare the entries to be sent with AppendEntries
285             List<ReplicatedLogEntry> entries = new ArrayList<>();
286             entries.add(
287                 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
288             entries.add(
289                 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
290
291             // Send appendEntries with the same term as was set on the receiver
292             // before the new behavior was created (1 in this case)
293             // This will not work for a Candidate because as soon as a Candidate
294             // is created it increments the term
295             AppendEntries appendEntries =
296                 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
297
298             RaftActorBehavior behavior = createBehavior(context);
299
300             // Send an unknown message so that the state of the RaftActor remains unchanged
301             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
302
303             RaftActorBehavior raftBehavior =
304                 behavior.handleMessage(getRef(), appendEntries);
305
306             assertEquals(expected, raftBehavior);
307             assertEquals(5, log.last().getIndex() + 1);
308             assertNotNull(log.get(3));
309             assertNotNull(log.get(4));
310
311             // Also expect an AppendEntriesReply to be sent where success is false
312             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
313                 "AppendEntriesReply") {
314                 // do not put code outside this method, will run afterwards
315                 protected Boolean match(Object in) {
316                     if (in instanceof AppendEntriesReply) {
317                         AppendEntriesReply reply = (AppendEntriesReply) in;
318                         return reply.isSuccess();
319                     } else {
320                         throw noMatch();
321                     }
322                 }
323             }.get();
324
325             assertEquals(true, out);
326
327
328         }};
329     }
330
331
332
333     /**
334      * This test verifies that when a new AppendEntries message is received with
335      * new entries and the logs of the sender and receiver are out-of-sync that
336      * the log is first corrected by removing the out of sync entries from the
337      * log and then adding in the new entries sent with the AppendEntries message
338      *
339      * @throws Exception
340      */
341     @Test
342     public void testHandleAppendEntriesCorrectReceiverLogEntries()
343         throws Exception {
344         new JavaTestKit(getSystem()) {{
345
346             MockRaftActorContext context = (MockRaftActorContext)
347                 createActorContext();
348
349             // First set the receivers term to lower number
350             context.getTermInformation().update(2, "test");
351
352             // Prepare the receivers log
353             MockRaftActorContext.SimpleReplicatedLog log =
354                 new MockRaftActorContext.SimpleReplicatedLog();
355             log.append(
356                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
357             log.append(
358                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
359             log.append(
360                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
361
362             context.setReplicatedLog(log);
363
364             // Prepare the entries to be sent with AppendEntries
365             List<ReplicatedLogEntry> entries = new ArrayList<>();
366             entries.add(
367                 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
368             entries.add(
369                 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
370
371             // Send appendEntries with the same term as was set on the receiver
372             // before the new behavior was created (1 in this case)
373             // This will not work for a Candidate because as soon as a Candidate
374             // is created it increments the term
375             AppendEntries appendEntries =
376                 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
377
378             RaftActorBehavior behavior = createBehavior(context);
379
380             // Send an unknown message so that the state of the RaftActor remains unchanged
381             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
382
383             RaftActorBehavior raftBehavior =
384                 behavior.handleMessage(getRef(), appendEntries);
385
386             assertEquals(expected, raftBehavior);
387
388             // The entry at index 2 will be found out-of-sync with the leader
389             // and will be removed
390             // Then the two new entries will be added to the log
391             // Thus making the log to have 4 entries
392             assertEquals(4, log.last().getIndex() + 1);
393             assertNotNull(log.get(2));
394
395             assertEquals("one", log.get(1).getData().toString());
396
397             // Check that the entry at index 2 has the new data
398             assertEquals("two-1", log.get(2).getData().toString());
399
400             assertEquals("three", log.get(3).getData().toString());
401
402             assertNotNull(log.get(3));
403
404             // Also expect an AppendEntriesReply to be sent where success is false
405             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
406                 "AppendEntriesReply") {
407                 // do not put code outside this method, will run afterwards
408                 protected Boolean match(Object in) {
409                     if (in instanceof AppendEntriesReply) {
410                         AppendEntriesReply reply = (AppendEntriesReply) in;
411                         return reply.isSuccess();
412                     } else {
413                         throw noMatch();
414                     }
415                 }
416             }.get();
417
418             assertEquals(true, out);
419
420
421         }};
422     }
423
424
425     /**
426      * This test verifies that when InstallSnapshot is received by
427      * the follower its applied correctly.
428      *
429      * @throws Exception
430      */
431     @Test
432     public void testHandleInstallSnapshot() throws Exception {
433         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
434
435             ActorRef leaderActor = getSystem().actorOf(Props.create(
436                 MessageCollectorActor.class));
437
438             MockRaftActorContext context = (MockRaftActorContext)
439                 createActorContext(getRef());
440
441             Follower follower = (Follower)createBehavior(context);
442
443             HashMap<String, String> followerSnapshot = new HashMap<>();
444             followerSnapshot.put("1", "A");
445             followerSnapshot.put("2", "B");
446             followerSnapshot.put("3", "C");
447
448             ByteString bsSnapshot  = toByteString(followerSnapshot);
449             ByteString chunkData = ByteString.EMPTY;
450             int offset = 0;
451             int snapshotLength = bsSnapshot.size();
452             int i = 1;
453             int chunkIndex = 1;
454
455             do {
456                 chunkData = getNextChunk(bsSnapshot, offset);
457                 final InstallSnapshot installSnapshot =
458                     new InstallSnapshot(1, "leader-1", i, 1,
459                         chunkData, chunkIndex, 3);
460                 follower.handleMessage(leaderActor, installSnapshot);
461                 offset = offset + 50;
462                 i++;
463                 chunkIndex++;
464             } while ((offset+50) < snapshotLength);
465
466             final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
467             follower.handleMessage(leaderActor, installSnapshot3);
468
469             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
470                 @Override
471                 protected String match(Object o) throws Exception {
472                     if (o instanceof ApplySnapshot) {
473                         ApplySnapshot as = (ApplySnapshot)o;
474                         if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
475                             return "applySnapshot-lastIndex-mismatch";
476                         }
477                         if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
478                             return "applySnapshot-lastAppliedTerm-mismatch";
479                         }
480                         if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
481                             return "applySnapshot-lastAppliedIndex-mismatch";
482                         }
483                         if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
484                             return "applySnapshot-lastTerm-mismatch";
485                         }
486                         return "applySnapshot";
487                     }
488
489                     return "ignoreCase";
490                 }
491             }.get();
492
493             // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
494             assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
495
496             String applySnapshotMatch = "";
497             for (String reply: matches) {
498                 if (reply.startsWith("applySnapshot")) {
499                     applySnapshotMatch = reply;
500                 }
501             }
502
503             assertEquals("applySnapshot", applySnapshotMatch);
504
505             Object messages = executeLocalOperation(leaderActor, "get-all-messages");
506
507             assertNotNull(messages);
508             assertTrue(messages instanceof List);
509             List<Object> listMessages = (List<Object>) messages;
510
511             int installSnapshotReplyReceivedCount = 0;
512             for (Object message: listMessages) {
513                 if (message instanceof InstallSnapshotReply) {
514                     ++installSnapshotReplyReceivedCount;
515                 }
516             }
517
518             assertEquals(3, installSnapshotReplyReceivedCount);
519
520         }};
521     }
522
523     @Test
524     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
525         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
526             {
527
528                 ActorRef leaderActor = getSystem().actorOf(Props.create(
529                         MessageCollectorActor.class));
530
531                 MockRaftActorContext context = (MockRaftActorContext)
532                         createActorContext(getRef());
533
534                 Follower follower = (Follower) createBehavior(context);
535
536                 HashMap<String, String> followerSnapshot = new HashMap<>();
537                 followerSnapshot.put("1", "A");
538                 followerSnapshot.put("2", "B");
539                 followerSnapshot.put("3", "C");
540
541                 ByteString bsSnapshot = toByteString(followerSnapshot);
542
543                 final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
544                 follower.handleMessage(leaderActor, installSnapshot);
545
546                 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
547
548                 assertNotNull(messages);
549                 assertTrue(messages instanceof List);
550                 List<Object> listMessages = (List<Object>) messages;
551
552                 int installSnapshotReplyReceivedCount = 0;
553                 for (Object message: listMessages) {
554                     if (message instanceof InstallSnapshotReply) {
555                         ++installSnapshotReplyReceivedCount;
556                     }
557                 }
558
559                 assertEquals(1, installSnapshotReplyReceivedCount);
560                 InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
561                 assertEquals(false, reply.isSuccess());
562                 assertEquals(-1, reply.getChunkIndex());
563                 assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
564
565
566             }};
567     }
568
569     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
570         return MessageCollectorActor.getAllMessages(actor);
571     }
572
573     public ByteString getNextChunk (ByteString bs, int offset){
574         int snapshotLength = bs.size();
575         int start = offset;
576         int size = 50;
577         if (50 > snapshotLength) {
578             size = snapshotLength;
579         } else {
580             if ((start + 50) > snapshotLength) {
581                 size = snapshotLength - start;
582             }
583         }
584         return bs.substring(start, start + size);
585     }
586
587     private ByteString toByteString(Map<String, String> state) {
588         ByteArrayOutputStream b = null;
589         ObjectOutputStream o = null;
590         try {
591             try {
592                 b = new ByteArrayOutputStream();
593                 o = new ObjectOutputStream(b);
594                 o.writeObject(state);
595                 byte[] snapshotBytes = b.toByteArray();
596                 return ByteString.copyFrom(snapshotBytes);
597             } finally {
598                 if (o != null) {
599                     o.flush();
600                     o.close();
601                 }
602                 if (b != null) {
603                     b.close();
604                 }
605             }
606         } catch (IOException e) {
607             org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
608         }
609         return null;
610     }
611 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.