Merge "BUG-1690: catch wildcard InstanceIdentifiers"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.testkit.JavaTestKit;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
9 import org.junit.Test;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
13 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftActorContext;
15 import org.opendaylight.controller.cluster.raft.RaftState;
16 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
17 import org.opendaylight.controller.cluster.raft.SerializationUtils;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
20 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
21 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
23 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
24 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
25 import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
26 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
27
28 import java.io.ByteArrayOutputStream;
29 import java.io.IOException;
30 import java.io.ObjectOutputStream;
31 import java.util.HashMap;
32 import java.util.Map;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import static org.junit.Assert.assertEquals;
36 import static org.junit.Assert.assertNotNull;
37 import static org.junit.Assert.assertTrue;
38
39 public class LeaderTest extends AbstractRaftActorBehaviorTest {
40
41     private ActorRef leaderActor =
42         getSystem().actorOf(Props.create(DoNothingActor.class));
43     private ActorRef senderActor =
44         getSystem().actorOf(Props.create(DoNothingActor.class));
45
46     @Test
47     public void testHandleMessageForUnknownMessage() throws Exception {
48         new JavaTestKit(getSystem()) {{
49             Leader leader =
50                 new Leader(createActorContext());
51
52             // handle message should return the Leader state when it receives an
53             // unknown message
54             RaftState state = leader.handleMessage(senderActor, "foo");
55             Assert.assertEquals(RaftState.Leader, state);
56         }};
57     }
58
59
60     @Test
61     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
62         new JavaTestKit(getSystem()) {{
63
64             new Within(duration("1 seconds")) {
65                 protected void run() {
66
67                     ActorRef followerActor = getTestActor();
68
69                     MockRaftActorContext actorContext =
70                         (MockRaftActorContext) createActorContext();
71
72                     Map<String, String> peerAddresses = new HashMap();
73
74                     peerAddresses.put(followerActor.path().toString(),
75                         followerActor.path().toString());
76
77                     actorContext.setPeerAddresses(peerAddresses);
78
79                     Leader leader = new Leader(actorContext);
80                     leader.handleMessage(senderActor, new SendHeartBeat());
81
82                     final String out =
83                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
84                             // do not put code outside this method, will run afterwards
85                             protected String match(Object in) {
86                                 Object msg = fromSerializableMessage(in);
87                                 if (msg instanceof AppendEntries) {
88                                     if (((AppendEntries)msg).getTerm() == 0) {
89                                         return "match";
90                                     }
91                                     return null;
92                                 } else {
93                                     throw noMatch();
94                                 }
95                             }
96                         }.get(); // this extracts the received message
97
98                     assertEquals("match", out);
99
100                 }
101             };
102         }};
103     }
104
105     @Test
106     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
107         new JavaTestKit(getSystem()) {{
108
109             new Within(duration("1 seconds")) {
110                 protected void run() {
111
112                     ActorRef followerActor = getTestActor();
113
114                     MockRaftActorContext actorContext =
115                         (MockRaftActorContext) createActorContext();
116
117                     Map<String, String> peerAddresses = new HashMap();
118
119                     peerAddresses.put(followerActor.path().toString(),
120                         followerActor.path().toString());
121
122                     actorContext.setPeerAddresses(peerAddresses);
123
124                     Leader leader = new Leader(actorContext);
125                     RaftState raftState = leader
126                         .handleMessage(senderActor, new Replicate(null, null,
127                             new MockRaftActorContext.MockReplicatedLogEntry(1,
128                                 100,
129                                 new MockRaftActorContext.MockPayload("foo"))
130                         ));
131
132                     // State should not change
133                     assertEquals(RaftState.Leader, raftState);
134
135                     final String out =
136                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
137                             // do not put code outside this method, will run afterwards
138                             protected String match(Object in) {
139                                 Object msg = fromSerializableMessage(in);
140                                 if (msg instanceof AppendEntries) {
141                                     if (((AppendEntries)msg).getTerm() == 0) {
142                                         return "match";
143                                     }
144                                     return null;
145                                 } else {
146                                     throw noMatch();
147                                 }
148                             }
149                         }.get(); // this extracts the received message
150
151                     assertEquals("match", out);
152
153                 }
154
155
156             };
157         }};
158     }
159
160     @Test
161     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
162         new JavaTestKit(getSystem()) {{
163
164             new Within(duration("1 seconds")) {
165                 protected void run() {
166
167                     ActorRef raftActor = getTestActor();
168
169                     MockRaftActorContext actorContext =
170                         new MockRaftActorContext("test", getSystem(), raftActor);
171
172                     actorContext.getReplicatedLog().removeFrom(0);
173
174                     actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(0, 1,
175                         new MockRaftActorContext.MockPayload("foo")));
176
177                     ReplicatedLogImplEntry entry =
178                         new ReplicatedLogImplEntry(1, 1,
179                             new MockRaftActorContext.MockPayload("foo"));
180
181                     actorContext.getReplicatedLog().append(entry);
182
183                     Leader leader = new Leader(actorContext);
184                     RaftState raftState = leader
185                         .handleMessage(senderActor, new Replicate(null, "state-id",entry));
186
187                     // State should not change
188                     assertEquals(RaftState.Leader, raftState);
189
190                     assertEquals(1, actorContext.getCommitIndex());
191
192                     final String out =
193                         new ExpectMsg<String>(duration("1 seconds"),
194                             "match hint") {
195                             // do not put code outside this method, will run afterwards
196                             protected String match(Object in) {
197                                 if (in instanceof ApplyState) {
198                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
199                                         return "match";
200                                     }
201                                     return null;
202                                 } else {
203                                     throw noMatch();
204                                 }
205                             }
206                         }.get(); // this extracts the received message
207
208                     assertEquals("match", out);
209
210                 }
211             };
212         }};
213     }
214
215     @Test
216     public void testSendInstallSnapshot() {
217         new LeaderTestKit(getSystem()) {{
218
219             new Within(duration("1 seconds")) {
220                 protected void run() {
221                     ActorRef followerActor = getTestActor();
222
223                     Map<String, String> peerAddresses = new HashMap();
224                     peerAddresses.put(followerActor.path().toString(),
225                         followerActor.path().toString());
226
227
228                     MockRaftActorContext actorContext =
229                         (MockRaftActorContext) createActorContext(getRef());
230                     actorContext.setPeerAddresses(peerAddresses);
231
232
233                     Map<String, String> leadersSnapshot = new HashMap<>();
234                     leadersSnapshot.put("1", "A");
235                     leadersSnapshot.put("2", "B");
236                     leadersSnapshot.put("3", "C");
237
238                     //clears leaders log
239                     actorContext.getReplicatedLog().removeFrom(0);
240
241                     final int followersLastIndex = 2;
242                     final int snapshotIndex = 3;
243                     final int newEntryIndex = 4;
244                     final int snapshotTerm = 1;
245                     final int currentTerm = 2;
246
247                     // set the snapshot variables in replicatedlog
248                     actorContext.getReplicatedLog().setSnapshot(
249                         toByteString(leadersSnapshot));
250                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
251                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
252
253                     MockLeader leader = new MockLeader(actorContext);
254                     // set the follower info in leader
255                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
256
257                     // new entry
258                     ReplicatedLogImplEntry entry =
259                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
260                             new MockRaftActorContext.MockPayload("D"));
261
262                     // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
263                     RaftState raftState = leader.handleMessage(
264                         senderActor, new Replicate(null, "state-id", entry));
265
266                     assertEquals(RaftState.Leader, raftState);
267
268                     // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
269                     Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
270                         @Override
271                         protected Boolean match(Object o) throws Exception {
272                             if (o instanceof SendInstallSnapshot) {
273                                 return true;
274                             }
275                             return false;
276                         }
277                     }.get();
278
279                     boolean sendInstallSnapshotReceived = false;
280                     for (Boolean b: matches) {
281                         sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
282                     }
283
284                     assertTrue(sendInstallSnapshotReceived);
285
286                 }
287             };
288         }};
289     }
290
291     @Test
292     public void testInstallSnapshot() {
293         new LeaderTestKit(getSystem()) {{
294
295             new Within(duration("1 seconds")) {
296                 protected void run() {
297                     ActorRef followerActor = getTestActor();
298
299                     Map<String, String> peerAddresses = new HashMap();
300                     peerAddresses.put(followerActor.path().toString(),
301                         followerActor.path().toString());
302
303                     MockRaftActorContext actorContext =
304                         (MockRaftActorContext) createActorContext();
305                     actorContext.setPeerAddresses(peerAddresses);
306
307
308                     Map<String, String> leadersSnapshot = new HashMap<>();
309                     leadersSnapshot.put("1", "A");
310                     leadersSnapshot.put("2", "B");
311                     leadersSnapshot.put("3", "C");
312
313                     //clears leaders log
314                     actorContext.getReplicatedLog().removeFrom(0);
315
316                     final int followersLastIndex = 2;
317                     final int snapshotIndex = 3;
318                     final int newEntryIndex = 4;
319                     final int snapshotTerm = 1;
320                     final int currentTerm = 2;
321
322                     // set the snapshot variables in replicatedlog
323                     actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
324                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
325                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
326
327                     actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
328
329                     MockLeader leader = new MockLeader(actorContext);
330                     // set the follower info in leader
331                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
332
333                     // new entry
334                     ReplicatedLogImplEntry entry =
335                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
336                             new MockRaftActorContext.MockPayload("D"));
337
338
339                     RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
340
341                     assertEquals(RaftState.Leader, raftState);
342
343                     // check if installsnapshot gets called with the correct values.
344                     final String out =
345                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
346                             // do not put code outside this method, will run afterwards
347                             protected String match(Object in) {
348                                 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
349                                     InstallSnapshot is = (InstallSnapshot)
350                                         SerializationUtils.fromSerializable(in);
351                                     if (is.getData() == null) {
352                                         return "InstallSnapshot data is null";
353                                     }
354                                     if (is.getLastIncludedIndex() != snapshotIndex) {
355                                         return is.getLastIncludedIndex() + "!=" + snapshotIndex;
356                                     }
357                                     if (is.getLastIncludedTerm() != snapshotTerm) {
358                                         return is.getLastIncludedTerm() + "!=" + snapshotTerm;
359                                     }
360                                     if (is.getTerm() == currentTerm) {
361                                         return is.getTerm() + "!=" + currentTerm;
362                                     }
363
364                                     return "match";
365
366                                } else {
367                                     return "message mismatch:" + in.getClass();
368                                 }
369                             }
370                         }.get(); // this extracts the received message
371
372                     assertEquals("match", out);
373                 }
374             };
375         }};
376     }
377
378     @Test
379     public void testHandleInstallSnapshotReplyLastChunk() {
380         new LeaderTestKit(getSystem()) {{
381             new Within(duration("1 seconds")) {
382                 protected void run() {
383                     ActorRef followerActor = getTestActor();
384
385                     Map<String, String> peerAddresses = new HashMap();
386                     peerAddresses.put(followerActor.path().toString(),
387                         followerActor.path().toString());
388
389                     MockRaftActorContext actorContext =
390                         (MockRaftActorContext) createActorContext();
391                     actorContext.setPeerAddresses(peerAddresses);
392
393                     final int followersLastIndex = 2;
394                     final int snapshotIndex = 3;
395                     final int newEntryIndex = 4;
396                     final int snapshotTerm = 1;
397                     final int currentTerm = 2;
398
399                     MockLeader leader = new MockLeader(actorContext);
400                     // set the follower info in leader
401                     leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
402
403                     Map<String, String> leadersSnapshot = new HashMap<>();
404                     leadersSnapshot.put("1", "A");
405                     leadersSnapshot.put("2", "B");
406                     leadersSnapshot.put("3", "C");
407
408                     // set the snapshot variables in replicatedlog
409                     actorContext.getReplicatedLog().setSnapshot(
410                         toByteString(leadersSnapshot));
411                     actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
412                     actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
413                     actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
414
415                     ByteString bs = toByteString(leadersSnapshot);
416                     leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
417                     while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
418                         leader.getFollowerToSnapshot().getNextChunk();
419                         leader.getFollowerToSnapshot().incrementChunkIndex();
420                     }
421
422                     //clears leaders log
423                     actorContext.getReplicatedLog().removeFrom(0);
424
425                     RaftState raftState = leader.handleMessage(senderActor,
426                         new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
427                             leader.getFollowerToSnapshot().getChunkIndex(), true));
428
429                     assertEquals(RaftState.Leader, raftState);
430
431                     assertEquals(leader.mapFollowerToSnapshot.size(), 0);
432                     assertEquals(leader.followerToLog.size(), 1);
433                     assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
434                     FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
435                     assertEquals(snapshotIndex, fli.getMatchIndex().get());
436                     assertEquals(snapshotIndex, fli.getMatchIndex().get());
437                     assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
438                 }
439             };
440         }};
441     }
442
443     @Test
444     public void testFollowerToSnapshotLogic() {
445
446         MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
447
448         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
449             @Override
450             public int getSnapshotChunkSize() {
451                 return 50;
452             }
453         });
454
455         MockLeader leader = new MockLeader(actorContext);
456
457         Map<String, String> leadersSnapshot = new HashMap<>();
458         leadersSnapshot.put("1", "A");
459         leadersSnapshot.put("2", "B");
460         leadersSnapshot.put("3", "C");
461
462         ByteString bs = toByteString(leadersSnapshot);
463         byte[] barray = bs.toByteArray();
464
465         leader.createFollowerToSnapshot("followerId", bs);
466         assertEquals(bs.size(), barray.length);
467
468         int chunkIndex=0;
469         for (int i=0; i < barray.length; i = i + 50) {
470             int j = i + 50;
471             chunkIndex++;
472
473             if (i + 50 > barray.length) {
474                 j = barray.length;
475             }
476
477             ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
478             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
479             assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
480
481             leader.getFollowerToSnapshot().markSendStatus(true);
482             if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
483                 leader.getFollowerToSnapshot().incrementChunkIndex();
484             }
485         }
486
487         assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
488     }
489
490
491     @Override protected RaftActorBehavior createBehavior(
492         RaftActorContext actorContext) {
493         return new Leader(actorContext);
494     }
495
496     @Override protected RaftActorContext createActorContext() {
497         return createActorContext(leaderActor);
498     }
499
500     protected RaftActorContext createActorContext(ActorRef actorRef) {
501         return new MockRaftActorContext("test", getSystem(), actorRef);
502     }
503
504     private ByteString toByteString(Map<String, String> state) {
505         ByteArrayOutputStream b = null;
506         ObjectOutputStream o = null;
507         try {
508             try {
509                 b = new ByteArrayOutputStream();
510                 o = new ObjectOutputStream(b);
511                 o.writeObject(state);
512                 byte[] snapshotBytes = b.toByteArray();
513                 return ByteString.copyFrom(snapshotBytes);
514             } finally {
515                 if (o != null) {
516                     o.flush();
517                     o.close();
518                 }
519                 if (b != null) {
520                     b.close();
521                 }
522             }
523         } catch (IOException e) {
524             Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
525         }
526         return null;
527     }
528
529     private static class LeaderTestKit extends JavaTestKit {
530
531         private LeaderTestKit(ActorSystem actorSystem) {
532             super(actorSystem);
533         }
534
535         protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
536             // Wait for a specific log message to show up
537             final boolean result =
538             new JavaTestKit.EventFilter<Boolean>(logLevel
539             ) {
540                 @Override
541                 protected Boolean run() {
542                     return true;
543                 }
544             }.from(subject.path().toString())
545                 .message(logMessage)
546                 .occurrences(1).exec();
547
548             Assert.assertEquals(true, result);
549
550         }
551     }
552
553     class MockLeader extends Leader {
554
555         FollowerToSnapshot fts;
556
557         public MockLeader(RaftActorContext context){
558             super(context);
559         }
560
561         public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
562             FollowerLogInformation followerLogInformation =
563                 new FollowerLogInformationImpl(followerId,
564                     new AtomicLong(nextIndex),
565                     new AtomicLong(matchIndex));
566             followerToLog.put(followerId, followerLogInformation);
567         }
568
569         public FollowerToSnapshot getFollowerToSnapshot() {
570             return fts;
571         }
572
573         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
574             fts = new FollowerToSnapshot(bs);
575             mapFollowerToSnapshot.put(followerId, fts);
576
577         }
578     }
579 }