1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.testkit.JavaTestKit;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
13 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftActorContext;
15 import org.opendaylight.controller.cluster.raft.RaftState;
16 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
17 import org.opendaylight.controller.cluster.raft.SerializationUtils;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
20 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
21 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
23 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
24 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
25 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
26 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
27 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
28 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
29 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
31 import java.io.ByteArrayOutputStream;
32 import java.io.IOException;
33 import java.io.ObjectOutputStream;
34 import java.util.HashMap;
36 import java.util.concurrent.atomic.AtomicLong;
38 import static org.junit.Assert.assertEquals;
39 import static org.junit.Assert.assertNotNull;
40 import static org.junit.Assert.assertTrue;
42 public class LeaderTest extends AbstractRaftActorBehaviorTest {
44 private ActorRef leaderActor =
45 getSystem().actorOf(Props.create(DoNothingActor.class));
46 private ActorRef senderActor =
47 getSystem().actorOf(Props.create(DoNothingActor.class));
50 public void testHandleMessageForUnknownMessage() throws Exception {
51 new JavaTestKit(getSystem()) {{
53 new Leader(createActorContext());
55 // handle message should return the Leader state when it receives an
57 RaftState state = leader.handleMessage(senderActor, "foo");
58 Assert.assertEquals(RaftState.Leader, state);
64 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
65 new JavaTestKit(getSystem()) {{
67 new Within(duration("1 seconds")) {
68 protected void run() {
70 ActorRef followerActor = getTestActor();
72 MockRaftActorContext actorContext =
73 (MockRaftActorContext) createActorContext();
75 Map<String, String> peerAddresses = new HashMap();
77 peerAddresses.put(followerActor.path().toString(),
78 followerActor.path().toString());
80 actorContext.setPeerAddresses(peerAddresses);
82 Leader leader = new Leader(actorContext);
83 leader.handleMessage(senderActor, new SendHeartBeat());
86 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
87 // do not put code outside this method, will run afterwards
88 protected String match(Object in) {
89 Object msg = fromSerializableMessage(in);
90 if (msg instanceof AppendEntries) {
91 if (((AppendEntries)msg).getTerm() == 0) {
99 }.get(); // this extracts the received message
101 assertEquals("match", out);
109 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
110 new JavaTestKit(getSystem()) {{
112 new Within(duration("1 seconds")) {
113 protected void run() {
115 ActorRef followerActor = getTestActor();
117 MockRaftActorContext actorContext =
118 (MockRaftActorContext) createActorContext();
120 Map<String, String> peerAddresses = new HashMap();
122 peerAddresses.put(followerActor.path().toString(),
123 followerActor.path().toString());
125 actorContext.setPeerAddresses(peerAddresses);
127 Leader leader = new Leader(actorContext);
128 RaftState raftState = leader
129 .handleMessage(senderActor, new Replicate(null, null,
130 new MockRaftActorContext.MockReplicatedLogEntry(1,
132 new MockRaftActorContext.MockPayload("foo"))
135 // State should not change
136 assertEquals(RaftState.Leader, raftState);
139 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
140 // do not put code outside this method, will run afterwards
141 protected String match(Object in) {
142 Object msg = fromSerializableMessage(in);
143 if (msg instanceof AppendEntries) {
144 if (((AppendEntries)msg).getTerm() == 0) {
152 }.get(); // this extracts the received message
154 assertEquals("match", out);
164 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
165 new JavaTestKit(getSystem()) {{
167 new Within(duration("1 seconds")) {
168 protected void run() {
170 ActorRef raftActor = getTestActor();
172 MockRaftActorContext actorContext =
173 new MockRaftActorContext("test", getSystem(), raftActor);
175 actorContext.getReplicatedLog().removeFrom(0);
177 actorContext.setReplicatedLog(
178 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
181 Leader leader = new Leader(actorContext);
182 RaftState raftState = leader
183 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
185 // State should not change
186 assertEquals(RaftState.Leader, raftState);
188 assertEquals(1, actorContext.getCommitIndex());
191 new ExpectMsg<String>(duration("1 seconds"),
193 // do not put code outside this method, will run afterwards
194 protected String match(Object in) {
195 if (in instanceof ApplyState) {
196 if (((ApplyState) in).getIdentifier().equals("state-id")) {
204 }.get(); // this extracts the received message
206 assertEquals("match", out);
214 public void testSendInstallSnapshot() {
215 new LeaderTestKit(getSystem()) {{
217 new Within(duration("1 seconds")) {
218 protected void run() {
219 ActorRef followerActor = getTestActor();
221 Map<String, String> peerAddresses = new HashMap();
222 peerAddresses.put(followerActor.path().toString(),
223 followerActor.path().toString());
226 MockRaftActorContext actorContext =
227 (MockRaftActorContext) createActorContext(getRef());
228 actorContext.setPeerAddresses(peerAddresses);
231 Map<String, String> leadersSnapshot = new HashMap<>();
232 leadersSnapshot.put("1", "A");
233 leadersSnapshot.put("2", "B");
234 leadersSnapshot.put("3", "C");
237 actorContext.getReplicatedLog().removeFrom(0);
239 final int followersLastIndex = 2;
240 final int snapshotIndex = 3;
241 final int newEntryIndex = 4;
242 final int snapshotTerm = 1;
243 final int currentTerm = 2;
245 // set the snapshot variables in replicatedlog
246 actorContext.getReplicatedLog().setSnapshot(
247 toByteString(leadersSnapshot));
248 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
249 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
251 MockLeader leader = new MockLeader(actorContext);
252 // set the follower info in leader
253 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
256 ReplicatedLogImplEntry entry =
257 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
258 new MockRaftActorContext.MockPayload("D"));
260 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
261 RaftState raftState = leader.handleMessage(
262 senderActor, new Replicate(null, "state-id", entry));
264 assertEquals(RaftState.Leader, raftState);
266 // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
267 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
269 protected Boolean match(Object o) throws Exception {
270 if (o instanceof SendInstallSnapshot) {
277 boolean sendInstallSnapshotReceived = false;
278 for (Boolean b: matches) {
279 sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
282 assertTrue(sendInstallSnapshotReceived);
290 public void testInstallSnapshot() {
291 new LeaderTestKit(getSystem()) {{
293 new Within(duration("1 seconds")) {
294 protected void run() {
295 ActorRef followerActor = getTestActor();
297 Map<String, String> peerAddresses = new HashMap();
298 peerAddresses.put(followerActor.path().toString(),
299 followerActor.path().toString());
301 MockRaftActorContext actorContext =
302 (MockRaftActorContext) createActorContext();
303 actorContext.setPeerAddresses(peerAddresses);
306 Map<String, String> leadersSnapshot = new HashMap<>();
307 leadersSnapshot.put("1", "A");
308 leadersSnapshot.put("2", "B");
309 leadersSnapshot.put("3", "C");
312 actorContext.getReplicatedLog().removeFrom(0);
314 final int followersLastIndex = 2;
315 final int snapshotIndex = 3;
316 final int newEntryIndex = 4;
317 final int snapshotTerm = 1;
318 final int currentTerm = 2;
320 // set the snapshot variables in replicatedlog
321 actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
322 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
323 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
325 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
327 MockLeader leader = new MockLeader(actorContext);
328 // set the follower info in leader
329 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
332 ReplicatedLogImplEntry entry =
333 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
334 new MockRaftActorContext.MockPayload("D"));
336 RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
338 assertEquals(RaftState.Leader, raftState);
340 // check if installsnapshot gets called with the correct values.
342 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
343 // do not put code outside this method, will run afterwards
344 protected String match(Object in) {
345 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
346 InstallSnapshot is = (InstallSnapshot)
347 SerializationUtils.fromSerializable(in);
348 if (is.getData() == null) {
349 return "InstallSnapshot data is null";
351 if (is.getLastIncludedIndex() != snapshotIndex) {
352 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
354 if (is.getLastIncludedTerm() != snapshotTerm) {
355 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
357 if (is.getTerm() == currentTerm) {
358 return is.getTerm() + "!=" + currentTerm;
364 return "message mismatch:" + in.getClass();
367 }.get(); // this extracts the received message
369 assertEquals("match", out);
376 public void testHandleInstallSnapshotReplyLastChunk() {
377 new LeaderTestKit(getSystem()) {{
378 new Within(duration("1 seconds")) {
379 protected void run() {
380 ActorRef followerActor = getTestActor();
382 Map<String, String> peerAddresses = new HashMap();
383 peerAddresses.put(followerActor.path().toString(),
384 followerActor.path().toString());
386 MockRaftActorContext actorContext =
387 (MockRaftActorContext) createActorContext();
388 actorContext.setPeerAddresses(peerAddresses);
390 final int followersLastIndex = 2;
391 final int snapshotIndex = 3;
392 final int newEntryIndex = 4;
393 final int snapshotTerm = 1;
394 final int currentTerm = 2;
396 MockLeader leader = new MockLeader(actorContext);
397 // set the follower info in leader
398 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
400 Map<String, String> leadersSnapshot = new HashMap<>();
401 leadersSnapshot.put("1", "A");
402 leadersSnapshot.put("2", "B");
403 leadersSnapshot.put("3", "C");
405 // set the snapshot variables in replicatedlog
406 actorContext.getReplicatedLog().setSnapshot(
407 toByteString(leadersSnapshot));
408 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
409 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
410 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
412 ByteString bs = toByteString(leadersSnapshot);
413 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
414 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
415 leader.getFollowerToSnapshot().getNextChunk();
416 leader.getFollowerToSnapshot().incrementChunkIndex();
420 actorContext.getReplicatedLog().removeFrom(0);
422 RaftState raftState = leader.handleMessage(senderActor,
423 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
424 leader.getFollowerToSnapshot().getChunkIndex(), true));
426 assertEquals(RaftState.Leader, raftState);
428 assertEquals(leader.mapFollowerToSnapshot.size(), 0);
429 assertEquals(leader.followerToLog.size(), 1);
430 assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
431 FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
432 assertEquals(snapshotIndex, fli.getMatchIndex().get());
433 assertEquals(snapshotIndex, fli.getMatchIndex().get());
434 assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
441 public void testFollowerToSnapshotLogic() {
443 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
445 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
447 public int getSnapshotChunkSize() {
452 MockLeader leader = new MockLeader(actorContext);
454 Map<String, String> leadersSnapshot = new HashMap<>();
455 leadersSnapshot.put("1", "A");
456 leadersSnapshot.put("2", "B");
457 leadersSnapshot.put("3", "C");
459 ByteString bs = toByteString(leadersSnapshot);
460 byte[] barray = bs.toByteArray();
462 leader.createFollowerToSnapshot("followerId", bs);
463 assertEquals(bs.size(), barray.length);
466 for (int i=0; i < barray.length; i = i + 50) {
470 if (i + 50 > barray.length) {
474 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
475 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
476 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
478 leader.getFollowerToSnapshot().markSendStatus(true);
479 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
480 leader.getFollowerToSnapshot().incrementChunkIndex();
484 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
488 @Override protected RaftActorBehavior createBehavior(
489 RaftActorContext actorContext) {
490 return new Leader(actorContext);
493 @Override protected RaftActorContext createActorContext() {
494 return createActorContext(leaderActor);
497 protected RaftActorContext createActorContext(ActorRef actorRef) {
498 return new MockRaftActorContext("test", getSystem(), actorRef);
501 private ByteString toByteString(Map<String, String> state) {
502 ByteArrayOutputStream b = null;
503 ObjectOutputStream o = null;
506 b = new ByteArrayOutputStream();
507 o = new ObjectOutputStream(b);
508 o.writeObject(state);
509 byte[] snapshotBytes = b.toByteArray();
510 return ByteString.copyFrom(snapshotBytes);
520 } catch (IOException e) {
521 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
526 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
527 private static AbstractRaftActorBehavior behavior;
529 public ForwardMessageToBehaviorActor(){
533 @Override public void onReceive(Object message) throws Exception {
534 super.onReceive(message);
535 behavior.handleMessage(sender(), message);
538 public static void setBehavior(AbstractRaftActorBehavior behavior){
539 ForwardMessageToBehaviorActor.behavior = behavior;
544 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
545 new JavaTestKit(getSystem()) {{
547 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
549 MockRaftActorContext leaderActorContext =
550 new MockRaftActorContext("leader", getSystem(), leaderActor);
552 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
554 MockRaftActorContext followerActorContext =
555 new MockRaftActorContext("follower", getSystem(), followerActor);
557 Follower follower = new Follower(followerActorContext);
559 ForwardMessageToBehaviorActor.setBehavior(follower);
561 Map<String, String> peerAddresses = new HashMap();
562 peerAddresses.put(followerActor.path().toString(),
563 followerActor.path().toString());
565 leaderActorContext.setPeerAddresses(peerAddresses);
567 leaderActorContext.getReplicatedLog().removeFrom(0);
570 leaderActorContext.setReplicatedLog(
571 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
573 leaderActorContext.setCommitIndex(1);
575 followerActorContext.getReplicatedLog().removeFrom(0);
577 // follower too has the exact same log entries and has the same commit index
578 followerActorContext.setReplicatedLog(
579 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
581 followerActorContext.setCommitIndex(1);
583 Leader leader = new Leader(leaderActorContext);
585 leader.handleMessage(leaderActor, new SendHeartBeat());
587 AppendEntriesMessages.AppendEntries appendEntries =
588 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
589 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
591 assertNotNull(appendEntries);
593 assertEquals(1, appendEntries.getLeaderCommit());
594 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
595 assertEquals(0, appendEntries.getPrevLogIndex());
597 AppendEntriesReply appendEntriesReply =
598 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
599 leaderActor, AppendEntriesReply.class);
601 assertNotNull(appendEntriesReply);
603 // follower returns its next index
604 assertEquals(2, appendEntriesReply.getLogLastIndex());
605 assertEquals(1, appendEntriesReply.getLogLastTerm());
612 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
613 new JavaTestKit(getSystem()) {{
615 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
617 MockRaftActorContext leaderActorContext =
618 new MockRaftActorContext("leader", getSystem(), leaderActor);
620 ActorRef followerActor = getSystem().actorOf(
621 Props.create(ForwardMessageToBehaviorActor.class));
623 MockRaftActorContext followerActorContext =
624 new MockRaftActorContext("follower", getSystem(), followerActor);
626 Follower follower = new Follower(followerActorContext);
628 ForwardMessageToBehaviorActor.setBehavior(follower);
630 Map<String, String> peerAddresses = new HashMap();
631 peerAddresses.put(followerActor.path().toString(),
632 followerActor.path().toString());
634 leaderActorContext.setPeerAddresses(peerAddresses);
636 leaderActorContext.getReplicatedLog().removeFrom(0);
638 leaderActorContext.setReplicatedLog(
639 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
641 leaderActorContext.setCommitIndex(1);
643 followerActorContext.getReplicatedLog().removeFrom(0);
645 followerActorContext.setReplicatedLog(
646 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
648 // follower has the same log entries but its commit index > leaders commit index
649 followerActorContext.setCommitIndex(2);
651 Leader leader = new Leader(leaderActorContext);
653 leader.handleMessage(leaderActor, new SendHeartBeat());
655 AppendEntriesMessages.AppendEntries appendEntries =
656 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
657 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
659 assertNotNull(appendEntries);
661 assertEquals(1, appendEntries.getLeaderCommit());
662 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
663 assertEquals(0, appendEntries.getPrevLogIndex());
665 AppendEntriesReply appendEntriesReply =
666 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
667 leaderActor, AppendEntriesReply.class);
669 assertNotNull(appendEntriesReply);
671 assertEquals(2, appendEntriesReply.getLogLastIndex());
672 assertEquals(1, appendEntriesReply.getLogLastTerm());
677 private static class LeaderTestKit extends JavaTestKit {
679 private LeaderTestKit(ActorSystem actorSystem) {
683 protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
684 // Wait for a specific log message to show up
685 final boolean result =
686 new JavaTestKit.EventFilter<Boolean>(logLevel
689 protected Boolean run() {
692 }.from(subject.path().toString())
694 .occurrences(1).exec();
696 Assert.assertEquals(true, result);
701 class MockLeader extends Leader {
703 FollowerToSnapshot fts;
705 public MockLeader(RaftActorContext context){
709 public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
710 FollowerLogInformation followerLogInformation =
711 new FollowerLogInformationImpl(followerId,
712 new AtomicLong(nextIndex),
713 new AtomicLong(matchIndex));
714 followerToLog.put(followerId, followerLogInformation);
717 public FollowerToSnapshot getFollowerToSnapshot() {
721 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
722 fts = new FollowerToSnapshot(bs);
723 mapFollowerToSnapshot.put(followerId, fts);