1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.testkit.JavaTestKit;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
13 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftActorContext;
15 import org.opendaylight.controller.cluster.raft.RaftState;
16 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
17 import org.opendaylight.controller.cluster.raft.SerializationUtils;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
19 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
20 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
21 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
22 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
23 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
25 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
27 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
28 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
29 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
30 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
31 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
33 import java.io.ByteArrayOutputStream;
34 import java.io.IOException;
35 import java.io.ObjectOutputStream;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.concurrent.atomic.AtomicLong;
41 import static org.junit.Assert.assertEquals;
42 import static org.junit.Assert.assertNotNull;
43 import static org.junit.Assert.assertTrue;
45 public class LeaderTest extends AbstractRaftActorBehaviorTest {
47 private ActorRef leaderActor =
48 getSystem().actorOf(Props.create(DoNothingActor.class));
49 private ActorRef senderActor =
50 getSystem().actorOf(Props.create(DoNothingActor.class));
53 public void testHandleMessageForUnknownMessage() throws Exception {
54 new JavaTestKit(getSystem()) {{
56 new Leader(createActorContext());
58 // handle message should return the Leader state when it receives an
60 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
61 Assert.assertTrue(behavior instanceof Leader);
67 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
68 new JavaTestKit(getSystem()) {{
70 new Within(duration("1 seconds")) {
71 protected void run() {
73 ActorRef followerActor = getTestActor();
75 MockRaftActorContext actorContext =
76 (MockRaftActorContext) createActorContext();
78 Map<String, String> peerAddresses = new HashMap();
80 peerAddresses.put(followerActor.path().toString(),
81 followerActor.path().toString());
83 actorContext.setPeerAddresses(peerAddresses);
85 Leader leader = new Leader(actorContext);
86 leader.handleMessage(senderActor, new SendHeartBeat());
89 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
90 // do not put code outside this method, will run afterwards
91 protected String match(Object in) {
92 Object msg = fromSerializableMessage(in);
93 if (msg instanceof AppendEntries) {
94 if (((AppendEntries)msg).getTerm() == 0) {
102 }.get(); // this extracts the received message
104 assertEquals("match", out);
112 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
113 new JavaTestKit(getSystem()) {{
115 new Within(duration("1 seconds")) {
116 protected void run() {
118 ActorRef followerActor = getTestActor();
120 MockRaftActorContext actorContext =
121 (MockRaftActorContext) createActorContext();
123 Map<String, String> peerAddresses = new HashMap();
125 peerAddresses.put(followerActor.path().toString(),
126 followerActor.path().toString());
128 actorContext.setPeerAddresses(peerAddresses);
130 Leader leader = new Leader(actorContext);
131 RaftActorBehavior raftBehavior = leader
132 .handleMessage(senderActor, new Replicate(null, null,
133 new MockRaftActorContext.MockReplicatedLogEntry(1,
135 new MockRaftActorContext.MockPayload("foo"))
138 // State should not change
139 assertTrue(raftBehavior instanceof Leader);
142 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
143 // do not put code outside this method, will run afterwards
144 protected String match(Object in) {
145 Object msg = fromSerializableMessage(in);
146 if (msg instanceof AppendEntries) {
147 if (((AppendEntries)msg).getTerm() == 0) {
155 }.get(); // this extracts the received message
157 assertEquals("match", out);
167 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
168 new JavaTestKit(getSystem()) {{
170 new Within(duration("1 seconds")) {
171 protected void run() {
173 ActorRef raftActor = getTestActor();
175 MockRaftActorContext actorContext =
176 new MockRaftActorContext("test", getSystem(), raftActor);
178 actorContext.getReplicatedLog().removeFrom(0);
180 actorContext.setReplicatedLog(
181 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
184 Leader leader = new Leader(actorContext);
185 RaftActorBehavior raftBehavior = leader
186 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
188 // State should not change
189 assertTrue(raftBehavior instanceof Leader);
191 assertEquals(1, actorContext.getCommitIndex());
194 new ExpectMsg<String>(duration("1 seconds"),
196 // do not put code outside this method, will run afterwards
197 protected String match(Object in) {
198 if (in instanceof ApplyState) {
199 if (((ApplyState) in).getIdentifier().equals("state-id")) {
207 }.get(); // this extracts the received message
209 assertEquals("match", out);
217 public void testSendInstallSnapshot() {
218 new LeaderTestKit(getSystem()) {{
220 new Within(duration("1 seconds")) {
221 protected void run() {
222 ActorRef followerActor = getTestActor();
224 Map<String, String> peerAddresses = new HashMap();
225 peerAddresses.put(followerActor.path().toString(),
226 followerActor.path().toString());
229 MockRaftActorContext actorContext =
230 (MockRaftActorContext) createActorContext(getRef());
231 actorContext.setPeerAddresses(peerAddresses);
234 Map<String, String> leadersSnapshot = new HashMap<>();
235 leadersSnapshot.put("1", "A");
236 leadersSnapshot.put("2", "B");
237 leadersSnapshot.put("3", "C");
240 actorContext.getReplicatedLog().removeFrom(0);
242 final int followersLastIndex = 2;
243 final int snapshotIndex = 3;
244 final int newEntryIndex = 4;
245 final int snapshotTerm = 1;
246 final int currentTerm = 2;
248 // set the snapshot variables in replicatedlog
249 actorContext.getReplicatedLog().setSnapshot(
250 toByteString(leadersSnapshot));
251 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
252 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
254 MockLeader leader = new MockLeader(actorContext);
255 // set the follower info in leader
256 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
259 ReplicatedLogImplEntry entry =
260 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
261 new MockRaftActorContext.MockPayload("D"));
263 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
264 RaftActorBehavior raftBehavior = leader.handleMessage(
265 senderActor, new Replicate(null, "state-id", entry));
267 assertTrue(raftBehavior instanceof Leader);
269 // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
270 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
272 protected Boolean match(Object o) throws Exception {
273 if (o instanceof SendInstallSnapshot) {
280 boolean sendInstallSnapshotReceived = false;
281 for (Boolean b: matches) {
282 sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
285 assertTrue(sendInstallSnapshotReceived);
293 public void testInstallSnapshot() {
294 new LeaderTestKit(getSystem()) {{
296 new Within(duration("1 seconds")) {
297 protected void run() {
298 ActorRef followerActor = getTestActor();
300 Map<String, String> peerAddresses = new HashMap();
301 peerAddresses.put(followerActor.path().toString(),
302 followerActor.path().toString());
304 MockRaftActorContext actorContext =
305 (MockRaftActorContext) createActorContext();
306 actorContext.setPeerAddresses(peerAddresses);
309 Map<String, String> leadersSnapshot = new HashMap<>();
310 leadersSnapshot.put("1", "A");
311 leadersSnapshot.put("2", "B");
312 leadersSnapshot.put("3", "C");
315 actorContext.getReplicatedLog().removeFrom(0);
317 final int followersLastIndex = 2;
318 final int snapshotIndex = 3;
319 final int newEntryIndex = 4;
320 final int snapshotTerm = 1;
321 final int currentTerm = 2;
323 // set the snapshot variables in replicatedlog
324 actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
325 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
326 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
328 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
330 MockLeader leader = new MockLeader(actorContext);
331 // set the follower info in leader
332 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
335 ReplicatedLogImplEntry entry =
336 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
337 new MockRaftActorContext.MockPayload("D"));
339 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
341 assertTrue(raftBehavior instanceof Leader);
343 // check if installsnapshot gets called with the correct values.
345 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
346 // do not put code outside this method, will run afterwards
347 protected String match(Object in) {
348 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
349 InstallSnapshot is = (InstallSnapshot)
350 SerializationUtils.fromSerializable(in);
351 if (is.getData() == null) {
352 return "InstallSnapshot data is null";
354 if (is.getLastIncludedIndex() != snapshotIndex) {
355 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
357 if (is.getLastIncludedTerm() != snapshotTerm) {
358 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
360 if (is.getTerm() == currentTerm) {
361 return is.getTerm() + "!=" + currentTerm;
367 return "message mismatch:" + in.getClass();
370 }.get(); // this extracts the received message
372 assertEquals("match", out);
379 public void testHandleInstallSnapshotReplyLastChunk() {
380 new LeaderTestKit(getSystem()) {{
381 new Within(duration("1 seconds")) {
382 protected void run() {
383 ActorRef followerActor = getTestActor();
385 Map<String, String> peerAddresses = new HashMap();
386 peerAddresses.put(followerActor.path().toString(),
387 followerActor.path().toString());
389 MockRaftActorContext actorContext =
390 (MockRaftActorContext) createActorContext();
391 actorContext.setPeerAddresses(peerAddresses);
393 final int followersLastIndex = 2;
394 final int snapshotIndex = 3;
395 final int newEntryIndex = 4;
396 final int snapshotTerm = 1;
397 final int currentTerm = 2;
399 MockLeader leader = new MockLeader(actorContext);
400 // set the follower info in leader
401 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
403 Map<String, String> leadersSnapshot = new HashMap<>();
404 leadersSnapshot.put("1", "A");
405 leadersSnapshot.put("2", "B");
406 leadersSnapshot.put("3", "C");
408 // set the snapshot variables in replicatedlog
409 actorContext.getReplicatedLog().setSnapshot(
410 toByteString(leadersSnapshot));
411 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
412 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
413 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
415 ByteString bs = toByteString(leadersSnapshot);
416 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
417 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
418 leader.getFollowerToSnapshot().getNextChunk();
419 leader.getFollowerToSnapshot().incrementChunkIndex();
423 actorContext.getReplicatedLog().removeFrom(0);
425 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
426 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
427 leader.getFollowerToSnapshot().getChunkIndex(), true));
429 assertTrue(raftBehavior instanceof Leader);
431 assertEquals(leader.mapFollowerToSnapshot.size(), 0);
432 assertEquals(leader.followerToLog.size(), 1);
433 assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
434 FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
435 assertEquals(snapshotIndex, fli.getMatchIndex().get());
436 assertEquals(snapshotIndex, fli.getMatchIndex().get());
437 assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
444 public void testFollowerToSnapshotLogic() {
446 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
448 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
450 public int getSnapshotChunkSize() {
455 MockLeader leader = new MockLeader(actorContext);
457 Map<String, String> leadersSnapshot = new HashMap<>();
458 leadersSnapshot.put("1", "A");
459 leadersSnapshot.put("2", "B");
460 leadersSnapshot.put("3", "C");
462 ByteString bs = toByteString(leadersSnapshot);
463 byte[] barray = bs.toByteArray();
465 leader.createFollowerToSnapshot("followerId", bs);
466 assertEquals(bs.size(), barray.length);
469 for (int i=0; i < barray.length; i = i + 50) {
473 if (i + 50 > barray.length) {
477 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
478 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
479 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
481 leader.getFollowerToSnapshot().markSendStatus(true);
482 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
483 leader.getFollowerToSnapshot().incrementChunkIndex();
487 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
491 @Override protected RaftActorBehavior createBehavior(
492 RaftActorContext actorContext) {
493 return new Leader(actorContext);
496 @Override protected RaftActorContext createActorContext() {
497 return createActorContext(leaderActor);
500 protected RaftActorContext createActorContext(ActorRef actorRef) {
501 return new MockRaftActorContext("test", getSystem(), actorRef);
504 private ByteString toByteString(Map<String, String> state) {
505 ByteArrayOutputStream b = null;
506 ObjectOutputStream o = null;
509 b = new ByteArrayOutputStream();
510 o = new ObjectOutputStream(b);
511 o.writeObject(state);
512 byte[] snapshotBytes = b.toByteArray();
513 return ByteString.copyFrom(snapshotBytes);
523 } catch (IOException e) {
524 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
529 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
530 private static AbstractRaftActorBehavior behavior;
532 public ForwardMessageToBehaviorActor(){
536 @Override public void onReceive(Object message) throws Exception {
537 super.onReceive(message);
538 behavior.handleMessage(sender(), message);
541 public static void setBehavior(AbstractRaftActorBehavior behavior){
542 ForwardMessageToBehaviorActor.behavior = behavior;
547 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
548 new JavaTestKit(getSystem()) {{
550 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
552 MockRaftActorContext leaderActorContext =
553 new MockRaftActorContext("leader", getSystem(), leaderActor);
555 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
557 MockRaftActorContext followerActorContext =
558 new MockRaftActorContext("follower", getSystem(), followerActor);
560 Follower follower = new Follower(followerActorContext);
562 ForwardMessageToBehaviorActor.setBehavior(follower);
564 Map<String, String> peerAddresses = new HashMap();
565 peerAddresses.put(followerActor.path().toString(),
566 followerActor.path().toString());
568 leaderActorContext.setPeerAddresses(peerAddresses);
570 leaderActorContext.getReplicatedLog().removeFrom(0);
573 leaderActorContext.setReplicatedLog(
574 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
576 leaderActorContext.setCommitIndex(1);
578 followerActorContext.getReplicatedLog().removeFrom(0);
580 // follower too has the exact same log entries and has the same commit index
581 followerActorContext.setReplicatedLog(
582 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
584 followerActorContext.setCommitIndex(1);
586 Leader leader = new Leader(leaderActorContext);
588 leader.handleMessage(leaderActor, new SendHeartBeat());
590 AppendEntriesMessages.AppendEntries appendEntries =
591 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
592 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
594 assertNotNull(appendEntries);
596 assertEquals(1, appendEntries.getLeaderCommit());
597 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
598 assertEquals(0, appendEntries.getPrevLogIndex());
600 AppendEntriesReply appendEntriesReply =
601 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
602 leaderActor, AppendEntriesReply.class);
604 assertNotNull(appendEntriesReply);
606 // follower returns its next index
607 assertEquals(2, appendEntriesReply.getLogLastIndex());
608 assertEquals(1, appendEntriesReply.getLogLastTerm());
615 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
616 new JavaTestKit(getSystem()) {{
618 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
620 MockRaftActorContext leaderActorContext =
621 new MockRaftActorContext("leader", getSystem(), leaderActor);
623 ActorRef followerActor = getSystem().actorOf(
624 Props.create(ForwardMessageToBehaviorActor.class));
626 MockRaftActorContext followerActorContext =
627 new MockRaftActorContext("follower", getSystem(), followerActor);
629 Follower follower = new Follower(followerActorContext);
631 ForwardMessageToBehaviorActor.setBehavior(follower);
633 Map<String, String> peerAddresses = new HashMap();
634 peerAddresses.put(followerActor.path().toString(),
635 followerActor.path().toString());
637 leaderActorContext.setPeerAddresses(peerAddresses);
639 leaderActorContext.getReplicatedLog().removeFrom(0);
641 leaderActorContext.setReplicatedLog(
642 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
644 leaderActorContext.setCommitIndex(1);
646 followerActorContext.getReplicatedLog().removeFrom(0);
648 followerActorContext.setReplicatedLog(
649 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
651 // follower has the same log entries but its commit index > leaders commit index
652 followerActorContext.setCommitIndex(2);
654 Leader leader = new Leader(leaderActorContext);
656 leader.handleMessage(leaderActor, new SendHeartBeat());
658 AppendEntriesMessages.AppendEntries appendEntries =
659 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
660 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
662 assertNotNull(appendEntries);
664 assertEquals(1, appendEntries.getLeaderCommit());
665 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
666 assertEquals(0, appendEntries.getPrevLogIndex());
668 AppendEntriesReply appendEntriesReply =
669 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
670 leaderActor, AppendEntriesReply.class);
672 assertNotNull(appendEntriesReply);
674 assertEquals(2, appendEntriesReply.getLogLastIndex());
675 assertEquals(1, appendEntriesReply.getLogLastTerm());
681 public void testHandleAppendEntriesReplyFailure(){
682 new JavaTestKit(getSystem()) {
685 ActorRef leaderActor =
686 getSystem().actorOf(Props.create(MessageCollectorActor.class));
688 ActorRef followerActor =
689 getSystem().actorOf(Props.create(MessageCollectorActor.class));
692 MockRaftActorContext leaderActorContext =
693 new MockRaftActorContext("leader", getSystem(), leaderActor);
695 Map<String, String> peerAddresses = new HashMap();
696 peerAddresses.put("follower-1",
697 followerActor.path().toString());
699 leaderActorContext.setPeerAddresses(peerAddresses);
701 Leader leader = new Leader(leaderActorContext);
703 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
705 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
707 assertEquals(RaftState.Leader, raftActorBehavior.state());
713 public void testHandleAppendEntriesReplySuccess() throws Exception {
714 new JavaTestKit(getSystem()) {
717 ActorRef leaderActor =
718 getSystem().actorOf(Props.create(MessageCollectorActor.class));
720 ActorRef followerActor =
721 getSystem().actorOf(Props.create(MessageCollectorActor.class));
724 MockRaftActorContext leaderActorContext =
725 new MockRaftActorContext("leader", getSystem(), leaderActor);
727 leaderActorContext.setReplicatedLog(
728 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
730 Map<String, String> peerAddresses = new HashMap();
731 peerAddresses.put("follower-1",
732 followerActor.path().toString());
734 leaderActorContext.setPeerAddresses(peerAddresses);
735 leaderActorContext.setCommitIndex(1);
736 leaderActorContext.setLastApplied(1);
737 leaderActorContext.getTermInformation().update(1, "leader");
739 Leader leader = new Leader(leaderActorContext);
741 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
743 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
745 assertEquals(RaftState.Leader, raftActorBehavior.state());
747 assertEquals(2, leaderActorContext.getCommitIndex());
749 ApplyLogEntries applyLogEntries =
750 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
751 ApplyLogEntries.class);
753 assertNotNull(applyLogEntries);
755 assertEquals(2, leaderActorContext.getLastApplied());
757 assertEquals(2, applyLogEntries.getToIndex());
759 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
762 assertEquals(1,applyStateList.size());
764 ApplyState applyState = (ApplyState) applyStateList.get(0);
766 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
772 public void testHandleAppendEntriesReplyUnknownFollower(){
773 new JavaTestKit(getSystem()) {
776 ActorRef leaderActor =
777 getSystem().actorOf(Props.create(MessageCollectorActor.class));
779 MockRaftActorContext leaderActorContext =
780 new MockRaftActorContext("leader", getSystem(), leaderActor);
782 Leader leader = new Leader(leaderActorContext);
784 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
786 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
788 assertEquals(RaftState.Leader, raftActorBehavior.state());
794 public void testHandleRequestVoteReply(){
795 new JavaTestKit(getSystem()) {
798 ActorRef leaderActor =
799 getSystem().actorOf(Props.create(MessageCollectorActor.class));
801 MockRaftActorContext leaderActorContext =
802 new MockRaftActorContext("leader", getSystem(), leaderActor);
804 Leader leader = new Leader(leaderActorContext);
806 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
808 assertEquals(RaftState.Leader, raftActorBehavior.state());
810 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
812 assertEquals(RaftState.Leader, raftActorBehavior.state());
819 private static class LeaderTestKit extends JavaTestKit {
821 private LeaderTestKit(ActorSystem actorSystem) {
825 protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
826 // Wait for a specific log message to show up
827 final boolean result =
828 new JavaTestKit.EventFilter<Boolean>(logLevel
831 protected Boolean run() {
834 }.from(subject.path().toString())
836 .occurrences(1).exec();
838 Assert.assertEquals(true, result);
843 class MockLeader extends Leader {
845 FollowerToSnapshot fts;
847 public MockLeader(RaftActorContext context){
851 public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
852 FollowerLogInformation followerLogInformation =
853 new FollowerLogInformationImpl(followerId,
854 new AtomicLong(nextIndex),
855 new AtomicLong(matchIndex));
856 followerToLog.put(followerId, followerLogInformation);
859 public FollowerToSnapshot getFollowerToSnapshot() {
863 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
864 fts = new FollowerToSnapshot(bs);
865 mapFollowerToSnapshot.put(followerId, fts);