1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.testkit.JavaTestKit;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
13 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftActorContext;
15 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
16 import org.opendaylight.controller.cluster.raft.SerializationUtils;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
18 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
19 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
20 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
23 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
24 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
25 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
26 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
27 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
28 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
30 import java.io.ByteArrayOutputStream;
31 import java.io.IOException;
32 import java.io.ObjectOutputStream;
33 import java.util.HashMap;
35 import java.util.concurrent.atomic.AtomicLong;
37 import static org.junit.Assert.assertEquals;
38 import static org.junit.Assert.assertNotNull;
39 import static org.junit.Assert.assertTrue;
41 public class LeaderTest extends AbstractRaftActorBehaviorTest {
43 private ActorRef leaderActor =
44 getSystem().actorOf(Props.create(DoNothingActor.class));
45 private ActorRef senderActor =
46 getSystem().actorOf(Props.create(DoNothingActor.class));
49 public void testHandleMessageForUnknownMessage() throws Exception {
50 new JavaTestKit(getSystem()) {{
52 new Leader(createActorContext());
54 // handle message should return the Leader state when it receives an
56 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
57 Assert.assertTrue(behavior instanceof Leader);
63 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
64 new JavaTestKit(getSystem()) {{
66 new Within(duration("1 seconds")) {
67 protected void run() {
69 ActorRef followerActor = getTestActor();
71 MockRaftActorContext actorContext =
72 (MockRaftActorContext) createActorContext();
74 Map<String, String> peerAddresses = new HashMap();
76 peerAddresses.put(followerActor.path().toString(),
77 followerActor.path().toString());
79 actorContext.setPeerAddresses(peerAddresses);
81 Leader leader = new Leader(actorContext);
82 leader.handleMessage(senderActor, new SendHeartBeat());
85 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
86 // do not put code outside this method, will run afterwards
87 protected String match(Object in) {
88 Object msg = fromSerializableMessage(in);
89 if (msg instanceof AppendEntries) {
90 if (((AppendEntries)msg).getTerm() == 0) {
98 }.get(); // this extracts the received message
100 assertEquals("match", out);
108 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
109 new JavaTestKit(getSystem()) {{
111 new Within(duration("1 seconds")) {
112 protected void run() {
114 ActorRef followerActor = getTestActor();
116 MockRaftActorContext actorContext =
117 (MockRaftActorContext) createActorContext();
119 Map<String, String> peerAddresses = new HashMap();
121 peerAddresses.put(followerActor.path().toString(),
122 followerActor.path().toString());
124 actorContext.setPeerAddresses(peerAddresses);
126 Leader leader = new Leader(actorContext);
127 RaftActorBehavior raftBehavior = leader
128 .handleMessage(senderActor, new Replicate(null, null,
129 new MockRaftActorContext.MockReplicatedLogEntry(1,
131 new MockRaftActorContext.MockPayload("foo"))
134 // State should not change
135 assertTrue(raftBehavior instanceof Leader);
138 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
139 // do not put code outside this method, will run afterwards
140 protected String match(Object in) {
141 Object msg = fromSerializableMessage(in);
142 if (msg instanceof AppendEntries) {
143 if (((AppendEntries)msg).getTerm() == 0) {
151 }.get(); // this extracts the received message
153 assertEquals("match", out);
163 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
164 new JavaTestKit(getSystem()) {{
166 new Within(duration("1 seconds")) {
167 protected void run() {
169 ActorRef raftActor = getTestActor();
171 MockRaftActorContext actorContext =
172 new MockRaftActorContext("test", getSystem(), raftActor);
174 actorContext.getReplicatedLog().removeFrom(0);
176 actorContext.setReplicatedLog(
177 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
180 Leader leader = new Leader(actorContext);
181 RaftActorBehavior raftBehavior = leader
182 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
184 // State should not change
185 assertTrue(raftBehavior instanceof Leader);
187 assertEquals(1, actorContext.getCommitIndex());
190 new ExpectMsg<String>(duration("1 seconds"),
192 // do not put code outside this method, will run afterwards
193 protected String match(Object in) {
194 if (in instanceof ApplyState) {
195 if (((ApplyState) in).getIdentifier().equals("state-id")) {
203 }.get(); // this extracts the received message
205 assertEquals("match", out);
213 public void testSendInstallSnapshot() {
214 new LeaderTestKit(getSystem()) {{
216 new Within(duration("1 seconds")) {
217 protected void run() {
218 ActorRef followerActor = getTestActor();
220 Map<String, String> peerAddresses = new HashMap();
221 peerAddresses.put(followerActor.path().toString(),
222 followerActor.path().toString());
225 MockRaftActorContext actorContext =
226 (MockRaftActorContext) createActorContext(getRef());
227 actorContext.setPeerAddresses(peerAddresses);
230 Map<String, String> leadersSnapshot = new HashMap<>();
231 leadersSnapshot.put("1", "A");
232 leadersSnapshot.put("2", "B");
233 leadersSnapshot.put("3", "C");
236 actorContext.getReplicatedLog().removeFrom(0);
238 final int followersLastIndex = 2;
239 final int snapshotIndex = 3;
240 final int newEntryIndex = 4;
241 final int snapshotTerm = 1;
242 final int currentTerm = 2;
244 // set the snapshot variables in replicatedlog
245 actorContext.getReplicatedLog().setSnapshot(
246 toByteString(leadersSnapshot));
247 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
248 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
250 MockLeader leader = new MockLeader(actorContext);
251 // set the follower info in leader
252 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
255 ReplicatedLogImplEntry entry =
256 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
257 new MockRaftActorContext.MockPayload("D"));
259 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
260 RaftActorBehavior raftBehavior = leader.handleMessage(
261 senderActor, new Replicate(null, "state-id", entry));
263 assertTrue(raftBehavior instanceof Leader);
265 // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
266 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
268 protected Boolean match(Object o) throws Exception {
269 if (o instanceof SendInstallSnapshot) {
276 boolean sendInstallSnapshotReceived = false;
277 for (Boolean b: matches) {
278 sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
281 assertTrue(sendInstallSnapshotReceived);
289 public void testInstallSnapshot() {
290 new LeaderTestKit(getSystem()) {{
292 new Within(duration("1 seconds")) {
293 protected void run() {
294 ActorRef followerActor = getTestActor();
296 Map<String, String> peerAddresses = new HashMap();
297 peerAddresses.put(followerActor.path().toString(),
298 followerActor.path().toString());
300 MockRaftActorContext actorContext =
301 (MockRaftActorContext) createActorContext();
302 actorContext.setPeerAddresses(peerAddresses);
305 Map<String, String> leadersSnapshot = new HashMap<>();
306 leadersSnapshot.put("1", "A");
307 leadersSnapshot.put("2", "B");
308 leadersSnapshot.put("3", "C");
311 actorContext.getReplicatedLog().removeFrom(0);
313 final int followersLastIndex = 2;
314 final int snapshotIndex = 3;
315 final int newEntryIndex = 4;
316 final int snapshotTerm = 1;
317 final int currentTerm = 2;
319 // set the snapshot variables in replicatedlog
320 actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
321 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
322 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
324 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
326 MockLeader leader = new MockLeader(actorContext);
327 // set the follower info in leader
328 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
331 ReplicatedLogImplEntry entry =
332 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
333 new MockRaftActorContext.MockPayload("D"));
335 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
337 assertTrue(raftBehavior instanceof Leader);
339 // check if installsnapshot gets called with the correct values.
341 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
342 // do not put code outside this method, will run afterwards
343 protected String match(Object in) {
344 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
345 InstallSnapshot is = (InstallSnapshot)
346 SerializationUtils.fromSerializable(in);
347 if (is.getData() == null) {
348 return "InstallSnapshot data is null";
350 if (is.getLastIncludedIndex() != snapshotIndex) {
351 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
353 if (is.getLastIncludedTerm() != snapshotTerm) {
354 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
356 if (is.getTerm() == currentTerm) {
357 return is.getTerm() + "!=" + currentTerm;
363 return "message mismatch:" + in.getClass();
366 }.get(); // this extracts the received message
368 assertEquals("match", out);
375 public void testHandleInstallSnapshotReplyLastChunk() {
376 new LeaderTestKit(getSystem()) {{
377 new Within(duration("1 seconds")) {
378 protected void run() {
379 ActorRef followerActor = getTestActor();
381 Map<String, String> peerAddresses = new HashMap();
382 peerAddresses.put(followerActor.path().toString(),
383 followerActor.path().toString());
385 MockRaftActorContext actorContext =
386 (MockRaftActorContext) createActorContext();
387 actorContext.setPeerAddresses(peerAddresses);
389 final int followersLastIndex = 2;
390 final int snapshotIndex = 3;
391 final int newEntryIndex = 4;
392 final int snapshotTerm = 1;
393 final int currentTerm = 2;
395 MockLeader leader = new MockLeader(actorContext);
396 // set the follower info in leader
397 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
399 Map<String, String> leadersSnapshot = new HashMap<>();
400 leadersSnapshot.put("1", "A");
401 leadersSnapshot.put("2", "B");
402 leadersSnapshot.put("3", "C");
404 // set the snapshot variables in replicatedlog
405 actorContext.getReplicatedLog().setSnapshot(
406 toByteString(leadersSnapshot));
407 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
408 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
409 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
411 ByteString bs = toByteString(leadersSnapshot);
412 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
413 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
414 leader.getFollowerToSnapshot().getNextChunk();
415 leader.getFollowerToSnapshot().incrementChunkIndex();
419 actorContext.getReplicatedLog().removeFrom(0);
421 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
422 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
423 leader.getFollowerToSnapshot().getChunkIndex(), true));
425 assertTrue(raftBehavior instanceof Leader);
427 assertEquals(leader.mapFollowerToSnapshot.size(), 0);
428 assertEquals(leader.followerToLog.size(), 1);
429 assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
430 FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
431 assertEquals(snapshotIndex, fli.getMatchIndex().get());
432 assertEquals(snapshotIndex, fli.getMatchIndex().get());
433 assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
440 public void testFollowerToSnapshotLogic() {
442 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
444 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
446 public int getSnapshotChunkSize() {
451 MockLeader leader = new MockLeader(actorContext);
453 Map<String, String> leadersSnapshot = new HashMap<>();
454 leadersSnapshot.put("1", "A");
455 leadersSnapshot.put("2", "B");
456 leadersSnapshot.put("3", "C");
458 ByteString bs = toByteString(leadersSnapshot);
459 byte[] barray = bs.toByteArray();
461 leader.createFollowerToSnapshot("followerId", bs);
462 assertEquals(bs.size(), barray.length);
465 for (int i=0; i < barray.length; i = i + 50) {
469 if (i + 50 > barray.length) {
473 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
474 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
475 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
477 leader.getFollowerToSnapshot().markSendStatus(true);
478 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
479 leader.getFollowerToSnapshot().incrementChunkIndex();
483 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
487 @Override protected RaftActorBehavior createBehavior(
488 RaftActorContext actorContext) {
489 return new Leader(actorContext);
492 @Override protected RaftActorContext createActorContext() {
493 return createActorContext(leaderActor);
496 protected RaftActorContext createActorContext(ActorRef actorRef) {
497 return new MockRaftActorContext("test", getSystem(), actorRef);
500 private ByteString toByteString(Map<String, String> state) {
501 ByteArrayOutputStream b = null;
502 ObjectOutputStream o = null;
505 b = new ByteArrayOutputStream();
506 o = new ObjectOutputStream(b);
507 o.writeObject(state);
508 byte[] snapshotBytes = b.toByteArray();
509 return ByteString.copyFrom(snapshotBytes);
519 } catch (IOException e) {
520 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
525 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
526 private static AbstractRaftActorBehavior behavior;
528 public ForwardMessageToBehaviorActor(){
532 @Override public void onReceive(Object message) throws Exception {
533 super.onReceive(message);
534 behavior.handleMessage(sender(), message);
537 public static void setBehavior(AbstractRaftActorBehavior behavior){
538 ForwardMessageToBehaviorActor.behavior = behavior;
543 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
544 new JavaTestKit(getSystem()) {{
546 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
548 MockRaftActorContext leaderActorContext =
549 new MockRaftActorContext("leader", getSystem(), leaderActor);
551 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
553 MockRaftActorContext followerActorContext =
554 new MockRaftActorContext("follower", getSystem(), followerActor);
556 Follower follower = new Follower(followerActorContext);
558 ForwardMessageToBehaviorActor.setBehavior(follower);
560 Map<String, String> peerAddresses = new HashMap();
561 peerAddresses.put(followerActor.path().toString(),
562 followerActor.path().toString());
564 leaderActorContext.setPeerAddresses(peerAddresses);
566 leaderActorContext.getReplicatedLog().removeFrom(0);
569 leaderActorContext.setReplicatedLog(
570 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
572 leaderActorContext.setCommitIndex(1);
574 followerActorContext.getReplicatedLog().removeFrom(0);
576 // follower too has the exact same log entries and has the same commit index
577 followerActorContext.setReplicatedLog(
578 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
580 followerActorContext.setCommitIndex(1);
582 Leader leader = new Leader(leaderActorContext);
584 leader.handleMessage(leaderActor, new SendHeartBeat());
586 AppendEntriesMessages.AppendEntries appendEntries =
587 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
588 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
590 assertNotNull(appendEntries);
592 assertEquals(1, appendEntries.getLeaderCommit());
593 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
594 assertEquals(0, appendEntries.getPrevLogIndex());
596 AppendEntriesReply appendEntriesReply =
597 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
598 leaderActor, AppendEntriesReply.class);
600 assertNotNull(appendEntriesReply);
602 // follower returns its next index
603 assertEquals(2, appendEntriesReply.getLogLastIndex());
604 assertEquals(1, appendEntriesReply.getLogLastTerm());
611 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
612 new JavaTestKit(getSystem()) {{
614 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
616 MockRaftActorContext leaderActorContext =
617 new MockRaftActorContext("leader", getSystem(), leaderActor);
619 ActorRef followerActor = getSystem().actorOf(
620 Props.create(ForwardMessageToBehaviorActor.class));
622 MockRaftActorContext followerActorContext =
623 new MockRaftActorContext("follower", getSystem(), followerActor);
625 Follower follower = new Follower(followerActorContext);
627 ForwardMessageToBehaviorActor.setBehavior(follower);
629 Map<String, String> peerAddresses = new HashMap();
630 peerAddresses.put(followerActor.path().toString(),
631 followerActor.path().toString());
633 leaderActorContext.setPeerAddresses(peerAddresses);
635 leaderActorContext.getReplicatedLog().removeFrom(0);
637 leaderActorContext.setReplicatedLog(
638 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
640 leaderActorContext.setCommitIndex(1);
642 followerActorContext.getReplicatedLog().removeFrom(0);
644 followerActorContext.setReplicatedLog(
645 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
647 // follower has the same log entries but its commit index > leaders commit index
648 followerActorContext.setCommitIndex(2);
650 Leader leader = new Leader(leaderActorContext);
652 leader.handleMessage(leaderActor, new SendHeartBeat());
654 AppendEntriesMessages.AppendEntries appendEntries =
655 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
656 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
658 assertNotNull(appendEntries);
660 assertEquals(1, appendEntries.getLeaderCommit());
661 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
662 assertEquals(0, appendEntries.getPrevLogIndex());
664 AppendEntriesReply appendEntriesReply =
665 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
666 leaderActor, AppendEntriesReply.class);
668 assertNotNull(appendEntriesReply);
670 assertEquals(2, appendEntriesReply.getLogLastIndex());
671 assertEquals(1, appendEntriesReply.getLogLastTerm());
676 private static class LeaderTestKit extends JavaTestKit {
678 private LeaderTestKit(ActorSystem actorSystem) {
682 protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
683 // Wait for a specific log message to show up
684 final boolean result =
685 new JavaTestKit.EventFilter<Boolean>(logLevel
688 protected Boolean run() {
691 }.from(subject.path().toString())
693 .occurrences(1).exec();
695 Assert.assertEquals(true, result);
700 class MockLeader extends Leader {
702 FollowerToSnapshot fts;
704 public MockLeader(RaftActorContext context){
708 public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
709 FollowerLogInformation followerLogInformation =
710 new FollowerLogInformationImpl(followerId,
711 new AtomicLong(nextIndex),
712 new AtomicLong(matchIndex));
713 followerToLog.put(followerId, followerLogInformation);
716 public FollowerToSnapshot getFollowerToSnapshot() {
720 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
721 fts = new FollowerToSnapshot(bs);
722 mapFollowerToSnapshot.put(followerId, fts);