Merge "BUG 2317 : StatisticsManager does not unregister from yang notifications on...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import com.google.protobuf.ByteString;
7 import org.junit.Test;
8 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
9 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
10 import org.opendaylight.controller.cluster.raft.RaftActorContext;
11 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
12 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
13 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
14 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
15 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
16 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
17 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
18 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
20 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
21 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
22
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.io.ObjectOutputStream;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31
32 import static org.junit.Assert.assertEquals;
33 import static org.junit.Assert.assertNotNull;
34 import static org.junit.Assert.assertTrue;
35
36 public class FollowerTest extends AbstractRaftActorBehaviorTest {
37
38     private final ActorRef followerActor = getSystem().actorOf(Props.create(
39         DoNothingActor.class));
40
41
42     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
43         return new Follower(actorContext);
44     }
45
46     @Override protected  RaftActorContext createActorContext() {
47         return createActorContext(followerActor);
48     }
49
50     protected  RaftActorContext createActorContext(ActorRef actorRef){
51         return new MockRaftActorContext("test", getSystem(), actorRef);
52     }
53
54     @Test
55     public void testThatAnElectionTimeoutIsTriggered(){
56         new JavaTestKit(getSystem()) {{
57
58             new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
59                 protected void run() {
60
61                     Follower follower = new Follower(createActorContext(getTestActor()));
62
63                     final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
64                         // do not put code outside this method, will run afterwards
65                         protected Boolean match(Object in) {
66                             if (in instanceof ElectionTimeout) {
67                                 return true;
68                             } else {
69                                 throw noMatch();
70                             }
71                         }
72                     }.get();
73
74                     assertEquals(true, out);
75                 }
76             };
77         }};
78     }
79
80     @Test
81     public void testHandleElectionTimeout(){
82         RaftActorContext raftActorContext = createActorContext();
83         Follower follower =
84             new Follower(raftActorContext);
85
86         RaftActorBehavior raftBehavior =
87             follower.handleMessage(followerActor, new ElectionTimeout());
88
89         assertTrue(raftBehavior instanceof Candidate);
90     }
91
92     @Test
93     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
94         new JavaTestKit(getSystem()) {{
95
96             new Within(duration("1 seconds")) {
97                 protected void run() {
98
99                     RaftActorContext context = createActorContext(getTestActor());
100
101                     context.getTermInformation().update(1000, null);
102
103                     RaftActorBehavior follower = createBehavior(context);
104
105                     follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
106
107                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
108                         // do not put code outside this method, will run afterwards
109                         protected Boolean match(Object in) {
110                             if (in instanceof RequestVoteReply) {
111                                 RequestVoteReply reply = (RequestVoteReply) in;
112                                 return reply.isVoteGranted();
113                             } else {
114                                 throw noMatch();
115                             }
116                         }
117                     }.get();
118
119                     assertEquals(true, out);
120                 }
121             };
122         }};
123     }
124
125     @Test
126     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
127         new JavaTestKit(getSystem()) {{
128
129             new Within(duration("1 seconds")) {
130                 protected void run() {
131
132                     RaftActorContext context = createActorContext(getTestActor());
133
134                     context.getTermInformation().update(1000, "test");
135
136                     RaftActorBehavior follower = createBehavior(context);
137
138                     follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
139
140                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
141                         // do not put code outside this method, will run afterwards
142                         protected Boolean match(Object in) {
143                             if (in instanceof RequestVoteReply) {
144                                 RequestVoteReply reply = (RequestVoteReply) in;
145                                 return reply.isVoteGranted();
146                             } else {
147                                 throw noMatch();
148                             }
149                         }
150                     }.get();
151
152                     assertEquals(false, out);
153                 }
154             };
155         }};
156     }
157
158     /**
159      * This test verifies that when an AppendEntries RPC is received by a RaftActor
160      * with a commitIndex that is greater than what has been applied to the
161      * state machine of the RaftActor, the RaftActor applies the state and
162      * sets it current applied state to the commitIndex of the sender.
163      *
164      * @throws Exception
165      */
166     @Test
167     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
168         new JavaTestKit(getSystem()) {{
169
170             RaftActorContext context =
171                 createActorContext();
172
173             context.setLastApplied(100);
174             setLastLogEntry((MockRaftActorContext) context, 1, 100,
175                 new MockRaftActorContext.MockPayload(""));
176             ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
177
178             List<ReplicatedLogEntry> entries =
179                 Arrays.asList(
180                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
181                                 new MockRaftActorContext.MockPayload("foo"))
182                 );
183
184             // The new commitIndex is 101
185             AppendEntries appendEntries =
186                 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
187
188             RaftActorBehavior raftBehavior =
189                 createBehavior(context).handleMessage(getRef(), appendEntries);
190
191             assertEquals(101L, context.getLastApplied());
192
193         }};
194     }
195
196     /**
197      * This test verifies that when an AppendEntries is received a specific prevLogTerm
198      * which does not match the term that is in RaftActors log entry at prevLogIndex
199      * then the RaftActor does not change it's state and it returns a failure.
200      *
201      * @throws Exception
202      */
203     @Test
204     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
205         throws Exception {
206         new JavaTestKit(getSystem()) {{
207
208             MockRaftActorContext context = (MockRaftActorContext)
209                 createActorContext();
210
211             // First set the receivers term to lower number
212             context.getTermInformation().update(95, "test");
213
214             // Set the last log entry term for the receiver to be greater than
215             // what we will be sending as the prevLogTerm in AppendEntries
216             MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
217                 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
218
219             // AppendEntries is now sent with a bigger term
220             // this will set the receivers term to be the same as the sender's term
221             AppendEntries appendEntries =
222                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
223
224             RaftActorBehavior behavior = createBehavior(context);
225
226             // Send an unknown message so that the state of the RaftActor remains unchanged
227             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
228
229             RaftActorBehavior raftBehavior =
230                 behavior.handleMessage(getRef(), appendEntries);
231
232             assertEquals(expected, raftBehavior);
233
234             // Also expect an AppendEntriesReply to be sent where success is false
235             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
236                 "AppendEntriesReply") {
237                 // do not put code outside this method, will run afterwards
238                 protected Boolean match(Object in) {
239                     if (in instanceof AppendEntriesReply) {
240                         AppendEntriesReply reply = (AppendEntriesReply) in;
241                         return reply.isSuccess();
242                     } else {
243                         throw noMatch();
244                     }
245                 }
246             }.get();
247
248             assertEquals(false, out);
249
250
251         }};
252     }
253
254
255
256     /**
257      * This test verifies that when a new AppendEntries message is received with
258      * new entries and the logs of the sender and receiver match that the new
259      * entries get added to the log and the log is incremented by the number of
260      * entries received in appendEntries
261      *
262      * @throws Exception
263      */
264     @Test
265     public void testHandleAppendEntriesAddNewEntries() throws Exception {
266         new JavaTestKit(getSystem()) {{
267
268             MockRaftActorContext context = (MockRaftActorContext)
269                 createActorContext();
270
271             // First set the receivers term to lower number
272             context.getTermInformation().update(1, "test");
273
274             // Prepare the receivers log
275             MockRaftActorContext.SimpleReplicatedLog log =
276                 new MockRaftActorContext.SimpleReplicatedLog();
277             log.append(
278                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
279             log.append(
280                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
281             log.append(
282                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
283
284             context.setReplicatedLog(log);
285
286             // Prepare the entries to be sent with AppendEntries
287             List<ReplicatedLogEntry> entries = new ArrayList<>();
288             entries.add(
289                 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
290             entries.add(
291                 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
292
293             // Send appendEntries with the same term as was set on the receiver
294             // before the new behavior was created (1 in this case)
295             // This will not work for a Candidate because as soon as a Candidate
296             // is created it increments the term
297             AppendEntries appendEntries =
298                 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
299
300             RaftActorBehavior behavior = createBehavior(context);
301
302             // Send an unknown message so that the state of the RaftActor remains unchanged
303             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
304
305             RaftActorBehavior raftBehavior =
306                 behavior.handleMessage(getRef(), appendEntries);
307
308             assertEquals(expected, raftBehavior);
309             assertEquals(5, log.last().getIndex() + 1);
310             assertNotNull(log.get(3));
311             assertNotNull(log.get(4));
312
313             // Also expect an AppendEntriesReply to be sent where success is false
314             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
315                 "AppendEntriesReply") {
316                 // do not put code outside this method, will run afterwards
317                 protected Boolean match(Object in) {
318                     if (in instanceof AppendEntriesReply) {
319                         AppendEntriesReply reply = (AppendEntriesReply) in;
320                         return reply.isSuccess();
321                     } else {
322                         throw noMatch();
323                     }
324                 }
325             }.get();
326
327             assertEquals(true, out);
328
329
330         }};
331     }
332
333
334
335     /**
336      * This test verifies that when a new AppendEntries message is received with
337      * new entries and the logs of the sender and receiver are out-of-sync that
338      * the log is first corrected by removing the out of sync entries from the
339      * log and then adding in the new entries sent with the AppendEntries message
340      *
341      * @throws Exception
342      */
343     @Test
344     public void testHandleAppendEntriesCorrectReceiverLogEntries()
345         throws Exception {
346         new JavaTestKit(getSystem()) {{
347
348             MockRaftActorContext context = (MockRaftActorContext)
349                 createActorContext();
350
351             // First set the receivers term to lower number
352             context.getTermInformation().update(2, "test");
353
354             // Prepare the receivers log
355             MockRaftActorContext.SimpleReplicatedLog log =
356                 new MockRaftActorContext.SimpleReplicatedLog();
357             log.append(
358                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
359             log.append(
360                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
361             log.append(
362                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
363
364             context.setReplicatedLog(log);
365
366             // Prepare the entries to be sent with AppendEntries
367             List<ReplicatedLogEntry> entries = new ArrayList<>();
368             entries.add(
369                 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
370             entries.add(
371                 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
372
373             // Send appendEntries with the same term as was set on the receiver
374             // before the new behavior was created (1 in this case)
375             // This will not work for a Candidate because as soon as a Candidate
376             // is created it increments the term
377             AppendEntries appendEntries =
378                 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
379
380             RaftActorBehavior behavior = createBehavior(context);
381
382             // Send an unknown message so that the state of the RaftActor remains unchanged
383             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
384
385             RaftActorBehavior raftBehavior =
386                 behavior.handleMessage(getRef(), appendEntries);
387
388             assertEquals(expected, raftBehavior);
389
390             // The entry at index 2 will be found out-of-sync with the leader
391             // and will be removed
392             // Then the two new entries will be added to the log
393             // Thus making the log to have 4 entries
394             assertEquals(4, log.last().getIndex() + 1);
395             assertNotNull(log.get(2));
396
397             assertEquals("one", log.get(1).getData().toString());
398
399             // Check that the entry at index 2 has the new data
400             assertEquals("two-1", log.get(2).getData().toString());
401
402             assertEquals("three", log.get(3).getData().toString());
403
404             assertNotNull(log.get(3));
405
406             // Also expect an AppendEntriesReply to be sent where success is false
407             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
408                 "AppendEntriesReply") {
409                 // do not put code outside this method, will run afterwards
410                 protected Boolean match(Object in) {
411                     if (in instanceof AppendEntriesReply) {
412                         AppendEntriesReply reply = (AppendEntriesReply) in;
413                         return reply.isSuccess();
414                     } else {
415                         throw noMatch();
416                     }
417                 }
418             }.get();
419
420             assertEquals(true, out);
421
422
423         }};
424     }
425
426
427     /**
428      * This test verifies that when InstallSnapshot is received by
429      * the follower its applied correctly.
430      *
431      * @throws Exception
432      */
433     @Test
434     public void testHandleInstallSnapshot() throws Exception {
435         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
436
437             ActorRef leaderActor = getSystem().actorOf(Props.create(
438                 MessageCollectorActor.class));
439
440             MockRaftActorContext context = (MockRaftActorContext)
441                 createActorContext(getRef());
442
443             Follower follower = (Follower)createBehavior(context);
444
445             HashMap<String, String> followerSnapshot = new HashMap<>();
446             followerSnapshot.put("1", "A");
447             followerSnapshot.put("2", "B");
448             followerSnapshot.put("3", "C");
449
450             ByteString bsSnapshot  = toByteString(followerSnapshot);
451             ByteString chunkData = ByteString.EMPTY;
452             int offset = 0;
453             int snapshotLength = bsSnapshot.size();
454             int i = 1;
455
456             do {
457                 chunkData = getNextChunk(bsSnapshot, offset);
458                 final InstallSnapshot installSnapshot =
459                     new InstallSnapshot(1, "leader-1", i, 1,
460                         chunkData, i, 3);
461                 follower.handleMessage(leaderActor, installSnapshot);
462                 offset = offset + 50;
463                 i++;
464             } while ((offset+50) < snapshotLength);
465
466             final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
467             follower.handleMessage(leaderActor, installSnapshot3);
468
469             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
470                 @Override
471                 protected String match(Object o) throws Exception {
472                     if (o instanceof ApplySnapshot) {
473                         ApplySnapshot as = (ApplySnapshot)o;
474                         if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
475                             return "applySnapshot-lastIndex-mismatch";
476                         }
477                         if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
478                             return "applySnapshot-lastAppliedTerm-mismatch";
479                         }
480                         if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
481                             return "applySnapshot-lastAppliedIndex-mismatch";
482                         }
483                         if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
484                             return "applySnapshot-lastTerm-mismatch";
485                         }
486                         return "applySnapshot";
487                     }
488
489                     return "ignoreCase";
490                 }
491             }.get();
492
493             String applySnapshotMatch = "";
494             for (String reply: matches) {
495                 if (reply.startsWith("applySnapshot")) {
496                     applySnapshotMatch = reply;
497                 }
498             }
499
500             assertEquals("applySnapshot", applySnapshotMatch);
501
502             Object messages = executeLocalOperation(leaderActor, "get-all-messages");
503
504             assertNotNull(messages);
505             assertTrue(messages instanceof List);
506             List<Object> listMessages = (List<Object>) messages;
507
508             int installSnapshotReplyReceivedCount = 0;
509             for (Object message: listMessages) {
510                 if (message instanceof InstallSnapshotReply) {
511                     ++installSnapshotReplyReceivedCount;
512                 }
513             }
514
515             assertEquals(3, installSnapshotReplyReceivedCount);
516
517         }};
518     }
519
520     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
521         return MessageCollectorActor.getAllMessages(actor);
522     }
523
524     public ByteString getNextChunk (ByteString bs, int offset){
525         int snapshotLength = bs.size();
526         int start = offset;
527         int size = 50;
528         if (50 > snapshotLength) {
529             size = snapshotLength;
530         } else {
531             if ((start + 50) > snapshotLength) {
532                 size = snapshotLength - start;
533             }
534         }
535         return bs.substring(start, start + size);
536     }
537
538     private ByteString toByteString(Map<String, String> state) {
539         ByteArrayOutputStream b = null;
540         ObjectOutputStream o = null;
541         try {
542             try {
543                 b = new ByteArrayOutputStream();
544                 o = new ObjectOutputStream(b);
545                 o.writeObject(state);
546                 byte[] snapshotBytes = b.toByteArray();
547                 return ByteString.copyFrom(snapshotBytes);
548             } finally {
549                 if (o != null) {
550                     o.flush();
551                     o.close();
552                 }
553                 if (b != null) {
554                     b.close();
555                 }
556             }
557         } catch (IOException e) {
558             org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
559         }
560         return null;
561     }
562 }