[Fix for Bug 1631]
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import com.google.protobuf.ByteString;
7 import junit.framework.Assert;
8 import org.junit.Test;
9 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
10 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
11 import org.opendaylight.controller.cluster.raft.RaftActorContext;
12 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
13 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
14 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
15 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
16 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
17 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
18 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
21 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
22 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.ObjectOutputStream;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32
33 import static org.junit.Assert.assertEquals;
34 import static org.junit.Assert.assertNotNull;
35 import static org.junit.Assert.assertTrue;
36
37 public class FollowerTest extends AbstractRaftActorBehaviorTest {
38
39     private final ActorRef followerActor = getSystem().actorOf(Props.create(
40         DoNothingActor.class));
41
42
43     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
44         return new Follower(actorContext);
45     }
46
47     @Override protected  RaftActorContext createActorContext() {
48         return createActorContext(followerActor);
49     }
50
51     protected  RaftActorContext createActorContext(ActorRef actorRef){
52         return new MockRaftActorContext("test", getSystem(), actorRef);
53     }
54
55     @Test
56     public void testThatAnElectionTimeoutIsTriggered(){
57         new JavaTestKit(getSystem()) {{
58
59             new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
60                 protected void run() {
61
62                     Follower follower = new Follower(createActorContext(getTestActor()));
63
64                     final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
65                         // do not put code outside this method, will run afterwards
66                         protected Boolean match(Object in) {
67                             if (in instanceof ElectionTimeout) {
68                                 return true;
69                             } else {
70                                 throw noMatch();
71                             }
72                         }
73                     }.get();
74
75                     assertEquals(true, out);
76                 }
77             };
78         }};
79     }
80
81     @Test
82     public void testHandleElectionTimeout(){
83         RaftActorContext raftActorContext = createActorContext();
84         Follower follower =
85             new Follower(raftActorContext);
86
87         RaftActorBehavior raftBehavior =
88             follower.handleMessage(followerActor, new ElectionTimeout());
89
90         Assert.assertTrue(raftBehavior instanceof Candidate);
91     }
92
93     @Test
94     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
95         new JavaTestKit(getSystem()) {{
96
97             new Within(duration("1 seconds")) {
98                 protected void run() {
99
100                     RaftActorContext context = createActorContext(getTestActor());
101
102                     context.getTermInformation().update(1000, null);
103
104                     RaftActorBehavior follower = createBehavior(context);
105
106                     follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
107
108                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
109                         // do not put code outside this method, will run afterwards
110                         protected Boolean match(Object in) {
111                             if (in instanceof RequestVoteReply) {
112                                 RequestVoteReply reply = (RequestVoteReply) in;
113                                 return reply.isVoteGranted();
114                             } else {
115                                 throw noMatch();
116                             }
117                         }
118                     }.get();
119
120                     assertEquals(true, out);
121                 }
122             };
123         }};
124     }
125
126     @Test
127     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
128         new JavaTestKit(getSystem()) {{
129
130             new Within(duration("1 seconds")) {
131                 protected void run() {
132
133                     RaftActorContext context = createActorContext(getTestActor());
134
135                     context.getTermInformation().update(1000, "test");
136
137                     RaftActorBehavior follower = createBehavior(context);
138
139                     follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
140
141                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
142                         // do not put code outside this method, will run afterwards
143                         protected Boolean match(Object in) {
144                             if (in instanceof RequestVoteReply) {
145                                 RequestVoteReply reply = (RequestVoteReply) in;
146                                 return reply.isVoteGranted();
147                             } else {
148                                 throw noMatch();
149                             }
150                         }
151                     }.get();
152
153                     assertEquals(false, out);
154                 }
155             };
156         }};
157     }
158
159     /**
160      * This test verifies that when an AppendEntries RPC is received by a RaftActor
161      * with a commitIndex that is greater than what has been applied to the
162      * state machine of the RaftActor, the RaftActor applies the state and
163      * sets it current applied state to the commitIndex of the sender.
164      *
165      * @throws Exception
166      */
167     @Test
168     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
169         new JavaTestKit(getSystem()) {{
170
171             RaftActorContext context =
172                 createActorContext();
173
174             context.setLastApplied(100);
175             setLastLogEntry((MockRaftActorContext) context, 1, 100,
176                 new MockRaftActorContext.MockPayload(""));
177             ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
178
179             List<ReplicatedLogEntry> entries =
180                 Arrays.asList(
181                         (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
182                                 new MockRaftActorContext.MockPayload("foo"))
183                 );
184
185             // The new commitIndex is 101
186             AppendEntries appendEntries =
187                 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
188
189             RaftActorBehavior raftBehavior =
190                 createBehavior(context).handleMessage(getRef(), appendEntries);
191
192             assertEquals(101L, context.getLastApplied());
193
194         }};
195     }
196
197     /**
198      * This test verifies that when an AppendEntries is received a specific prevLogTerm
199      * which does not match the term that is in RaftActors log entry at prevLogIndex
200      * then the RaftActor does not change it's state and it returns a failure.
201      *
202      * @throws Exception
203      */
204     @Test
205     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
206         throws Exception {
207         new JavaTestKit(getSystem()) {{
208
209             MockRaftActorContext context = (MockRaftActorContext)
210                 createActorContext();
211
212             // First set the receivers term to lower number
213             context.getTermInformation().update(95, "test");
214
215             // Set the last log entry term for the receiver to be greater than
216             // what we will be sending as the prevLogTerm in AppendEntries
217             MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
218                 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
219
220             // AppendEntries is now sent with a bigger term
221             // this will set the receivers term to be the same as the sender's term
222             AppendEntries appendEntries =
223                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
224
225             RaftActorBehavior behavior = createBehavior(context);
226
227             // Send an unknown message so that the state of the RaftActor remains unchanged
228             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
229
230             RaftActorBehavior raftBehavior =
231                 behavior.handleMessage(getRef(), appendEntries);
232
233             assertEquals(expected, raftBehavior);
234
235             // Also expect an AppendEntriesReply to be sent where success is false
236             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
237                 "AppendEntriesReply") {
238                 // do not put code outside this method, will run afterwards
239                 protected Boolean match(Object in) {
240                     if (in instanceof AppendEntriesReply) {
241                         AppendEntriesReply reply = (AppendEntriesReply) in;
242                         return reply.isSuccess();
243                     } else {
244                         throw noMatch();
245                     }
246                 }
247             }.get();
248
249             assertEquals(false, out);
250
251
252         }};
253     }
254
255
256
257     /**
258      * This test verifies that when a new AppendEntries message is received with
259      * new entries and the logs of the sender and receiver match that the new
260      * entries get added to the log and the log is incremented by the number of
261      * entries received in appendEntries
262      *
263      * @throws Exception
264      */
265     @Test
266     public void testHandleAppendEntriesAddNewEntries() throws Exception {
267         new JavaTestKit(getSystem()) {{
268
269             MockRaftActorContext context = (MockRaftActorContext)
270                 createActorContext();
271
272             // First set the receivers term to lower number
273             context.getTermInformation().update(1, "test");
274
275             // Prepare the receivers log
276             MockRaftActorContext.SimpleReplicatedLog log =
277                 new MockRaftActorContext.SimpleReplicatedLog();
278             log.append(
279                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
280             log.append(
281                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
282             log.append(
283                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
284
285             context.setReplicatedLog(log);
286
287             // Prepare the entries to be sent with AppendEntries
288             List<ReplicatedLogEntry> entries = new ArrayList<>();
289             entries.add(
290                 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
291             entries.add(
292                 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
293
294             // Send appendEntries with the same term as was set on the receiver
295             // before the new behavior was created (1 in this case)
296             // This will not work for a Candidate because as soon as a Candidate
297             // is created it increments the term
298             AppendEntries appendEntries =
299                 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
300
301             RaftActorBehavior behavior = createBehavior(context);
302
303             // Send an unknown message so that the state of the RaftActor remains unchanged
304             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
305
306             RaftActorBehavior raftBehavior =
307                 behavior.handleMessage(getRef(), appendEntries);
308
309             assertEquals(expected, raftBehavior);
310             assertEquals(5, log.last().getIndex() + 1);
311             assertNotNull(log.get(3));
312             assertNotNull(log.get(4));
313
314             // Also expect an AppendEntriesReply to be sent where success is false
315             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
316                 "AppendEntriesReply") {
317                 // do not put code outside this method, will run afterwards
318                 protected Boolean match(Object in) {
319                     if (in instanceof AppendEntriesReply) {
320                         AppendEntriesReply reply = (AppendEntriesReply) in;
321                         return reply.isSuccess();
322                     } else {
323                         throw noMatch();
324                     }
325                 }
326             }.get();
327
328             assertEquals(true, out);
329
330
331         }};
332     }
333
334
335
336     /**
337      * This test verifies that when a new AppendEntries message is received with
338      * new entries and the logs of the sender and receiver are out-of-sync that
339      * the log is first corrected by removing the out of sync entries from the
340      * log and then adding in the new entries sent with the AppendEntries message
341      *
342      * @throws Exception
343      */
344     @Test
345     public void testHandleAppendEntriesCorrectReceiverLogEntries()
346         throws Exception {
347         new JavaTestKit(getSystem()) {{
348
349             MockRaftActorContext context = (MockRaftActorContext)
350                 createActorContext();
351
352             // First set the receivers term to lower number
353             context.getTermInformation().update(2, "test");
354
355             // Prepare the receivers log
356             MockRaftActorContext.SimpleReplicatedLog log =
357                 new MockRaftActorContext.SimpleReplicatedLog();
358             log.append(
359                 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
360             log.append(
361                 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
362             log.append(
363                 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
364
365             context.setReplicatedLog(log);
366
367             // Prepare the entries to be sent with AppendEntries
368             List<ReplicatedLogEntry> entries = new ArrayList<>();
369             entries.add(
370                 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
371             entries.add(
372                 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
373
374             // Send appendEntries with the same term as was set on the receiver
375             // before the new behavior was created (1 in this case)
376             // This will not work for a Candidate because as soon as a Candidate
377             // is created it increments the term
378             AppendEntries appendEntries =
379                 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
380
381             RaftActorBehavior behavior = createBehavior(context);
382
383             // Send an unknown message so that the state of the RaftActor remains unchanged
384             RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
385
386             RaftActorBehavior raftBehavior =
387                 behavior.handleMessage(getRef(), appendEntries);
388
389             assertEquals(expected, raftBehavior);
390
391             // The entry at index 2 will be found out-of-sync with the leader
392             // and will be removed
393             // Then the two new entries will be added to the log
394             // Thus making the log to have 4 entries
395             assertEquals(4, log.last().getIndex() + 1);
396             assertNotNull(log.get(2));
397
398             assertEquals("one", log.get(1).getData().toString());
399
400             // Check that the entry at index 2 has the new data
401             assertEquals("two-1", log.get(2).getData().toString());
402
403             assertEquals("three", log.get(3).getData().toString());
404
405             assertNotNull(log.get(3));
406
407             // Also expect an AppendEntriesReply to be sent where success is false
408             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
409                 "AppendEntriesReply") {
410                 // do not put code outside this method, will run afterwards
411                 protected Boolean match(Object in) {
412                     if (in instanceof AppendEntriesReply) {
413                         AppendEntriesReply reply = (AppendEntriesReply) in;
414                         return reply.isSuccess();
415                     } else {
416                         throw noMatch();
417                     }
418                 }
419             }.get();
420
421             assertEquals(true, out);
422
423
424         }};
425     }
426
427
428     /**
429      * This test verifies that when InstallSnapshot is received by
430      * the follower its applied correctly.
431      *
432      * @throws Exception
433      */
434     @Test
435     public void testHandleInstallSnapshot() throws Exception {
436         JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
437
438             ActorRef leaderActor = getSystem().actorOf(Props.create(
439                 MessageCollectorActor.class));
440
441             MockRaftActorContext context = (MockRaftActorContext)
442                 createActorContext(getRef());
443
444             Follower follower = (Follower)createBehavior(context);
445
446             HashMap<String, String> followerSnapshot = new HashMap<>();
447             followerSnapshot.put("1", "A");
448             followerSnapshot.put("2", "B");
449             followerSnapshot.put("3", "C");
450
451             ByteString bsSnapshot  = toByteString(followerSnapshot);
452             ByteString chunkData = ByteString.EMPTY;
453             int offset = 0;
454             int snapshotLength = bsSnapshot.size();
455             int i = 1;
456
457             do {
458                 chunkData = getNextChunk(bsSnapshot, offset);
459                 final InstallSnapshot installSnapshot =
460                     new InstallSnapshot(1, "leader-1", i, 1,
461                         chunkData, i, 3);
462                 follower.handleMessage(leaderActor, installSnapshot);
463                 offset = offset + 50;
464                 i++;
465             } while ((offset+50) < snapshotLength);
466
467             final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
468             follower.handleMessage(leaderActor, installSnapshot3);
469
470             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
471                 @Override
472                 protected String match(Object o) throws Exception {
473                     if (o instanceof ApplySnapshot) {
474                         ApplySnapshot as = (ApplySnapshot)o;
475                         if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
476                             return "applySnapshot-lastIndex-mismatch";
477                         }
478                         if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
479                             return "applySnapshot-lastAppliedTerm-mismatch";
480                         }
481                         if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
482                             return "applySnapshot-lastAppliedIndex-mismatch";
483                         }
484                         if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
485                             return "applySnapshot-lastTerm-mismatch";
486                         }
487                         return "applySnapshot";
488                     }
489
490                     return "ignoreCase";
491                 }
492             }.get();
493
494             String applySnapshotMatch = "";
495             for (String reply: matches) {
496                 if (reply.startsWith("applySnapshot")) {
497                     applySnapshotMatch = reply;
498                 }
499             }
500
501             assertEquals("applySnapshot", applySnapshotMatch);
502
503             Object messages = executeLocalOperation(leaderActor, "get-all-messages");
504
505             assertNotNull(messages);
506             assertTrue(messages instanceof List);
507             List<Object> listMessages = (List<Object>) messages;
508
509             int installSnapshotReplyReceivedCount = 0;
510             for (Object message: listMessages) {
511                 if (message instanceof InstallSnapshotReply) {
512                     ++installSnapshotReplyReceivedCount;
513                 }
514             }
515
516             assertEquals(3, installSnapshotReplyReceivedCount);
517
518         }};
519     }
520
521     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
522         return MessageCollectorActor.getAllMessages(actor);
523     }
524
525     public ByteString getNextChunk (ByteString bs, int offset){
526         int snapshotLength = bs.size();
527         int start = offset;
528         int size = 50;
529         if (50 > snapshotLength) {
530             size = snapshotLength;
531         } else {
532             if ((start + 50) > snapshotLength) {
533                 size = snapshotLength - start;
534             }
535         }
536         return bs.substring(start, start + size);
537     }
538
539     private ByteString toByteString(Map<String, String> state) {
540         ByteArrayOutputStream b = null;
541         ObjectOutputStream o = null;
542         try {
543             try {
544                 b = new ByteArrayOutputStream();
545                 o = new ObjectOutputStream(b);
546                 o.writeObject(state);
547                 byte[] snapshotBytes = b.toByteArray();
548                 return ByteString.copyFrom(snapshotBytes);
549             } finally {
550                 if (o != null) {
551                     o.flush();
552                     o.close();
553                 }
554                 if (b != null) {
555                     b.close();
556                 }
557             }
558         } catch (IOException e) {
559             org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
560         }
561         return null;
562     }
563 }