1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import com.google.common.base.Optional;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
13 import org.opendaylight.controller.cluster.raft.RaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftState;
15 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
16 import org.opendaylight.controller.cluster.raft.SerializationUtils;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
20 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
21 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
22 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
23 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
26 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
29 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
30 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
31 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
32 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
33 import scala.concurrent.duration.FiniteDuration;
35 import java.io.ByteArrayOutputStream;
36 import java.io.IOException;
37 import java.io.ObjectOutputStream;
38 import java.util.HashMap;
39 import java.util.List;
41 import java.util.concurrent.TimeUnit;
43 import static org.junit.Assert.assertEquals;
44 import static org.junit.Assert.assertNotNull;
45 import static org.junit.Assert.assertTrue;
47 public class LeaderTest extends AbstractRaftActorBehaviorTest {
49 private ActorRef leaderActor =
50 getSystem().actorOf(Props.create(DoNothingActor.class));
51 private ActorRef senderActor =
52 getSystem().actorOf(Props.create(DoNothingActor.class));
55 public void testHandleMessageForUnknownMessage() throws Exception {
56 new JavaTestKit(getSystem()) {{
58 new Leader(createActorContext());
60 // handle message should return the Leader state when it receives an
62 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
63 Assert.assertTrue(behavior instanceof Leader);
69 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
70 new JavaTestKit(getSystem()) {{
72 new Within(duration("1 seconds")) {
73 protected void run() {
75 ActorRef followerActor = getTestActor();
77 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
79 Map<String, String> peerAddresses = new HashMap<>();
81 peerAddresses.put(followerActor.path().toString(),
82 followerActor.path().toString());
84 actorContext.setPeerAddresses(peerAddresses);
86 Leader leader = new Leader(actorContext);
87 leader.handleMessage(senderActor, new SendHeartBeat());
90 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
91 // do not put code outside this method, will run afterwards
92 protected String match(Object in) {
93 Object msg = fromSerializableMessage(in);
94 if (msg instanceof AppendEntries) {
95 if (((AppendEntries)msg).getTerm() == 0) {
103 }.get(); // this extracts the received message
105 assertEquals("match", out);
113 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
114 new JavaTestKit(getSystem()) {{
116 new Within(duration("1 seconds")) {
117 protected void run() {
119 ActorRef followerActor = getTestActor();
121 MockRaftActorContext actorContext =
122 (MockRaftActorContext) createActorContext();
124 Map<String, String> peerAddresses = new HashMap<>();
126 peerAddresses.put(followerActor.path().toString(),
127 followerActor.path().toString());
129 actorContext.setPeerAddresses(peerAddresses);
131 Leader leader = new Leader(actorContext);
132 RaftActorBehavior raftBehavior = leader
133 .handleMessage(senderActor, new Replicate(null, null,
134 new MockRaftActorContext.MockReplicatedLogEntry(1,
136 new MockRaftActorContext.MockPayload("foo"))
139 // State should not change
140 assertTrue(raftBehavior instanceof Leader);
143 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
144 // do not put code outside this method, will run afterwards
145 protected String match(Object in) {
146 Object msg = fromSerializableMessage(in);
147 if (msg instanceof AppendEntries) {
148 if (((AppendEntries)msg).getTerm() == 0) {
156 }.get(); // this extracts the received message
158 assertEquals("match", out);
165 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
166 new JavaTestKit(getSystem()) {{
168 new Within(duration("1 seconds")) {
169 protected void run() {
171 ActorRef raftActor = getTestActor();
173 MockRaftActorContext actorContext =
174 new MockRaftActorContext("test", getSystem(), raftActor);
176 actorContext.getReplicatedLog().removeFrom(0);
178 actorContext.setReplicatedLog(
179 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
182 Leader leader = new Leader(actorContext);
183 RaftActorBehavior raftBehavior = leader
184 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
186 // State should not change
187 assertTrue(raftBehavior instanceof Leader);
189 assertEquals(1, actorContext.getCommitIndex());
192 new ExpectMsg<String>(duration("1 seconds"),
194 // do not put code outside this method, will run afterwards
195 protected String match(Object in) {
196 if (in instanceof ApplyState) {
197 if (((ApplyState) in).getIdentifier().equals("state-id")) {
205 }.get(); // this extracts the received message
207 assertEquals("match", out);
215 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
216 new JavaTestKit(getSystem()) {{
217 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
219 Map<String, String> peerAddresses = new HashMap<>();
220 peerAddresses.put(followerActor.path().toString(),
221 followerActor.path().toString());
223 MockRaftActorContext actorContext =
224 (MockRaftActorContext) createActorContext(leaderActor);
225 actorContext.setPeerAddresses(peerAddresses);
227 Map<String, String> leadersSnapshot = new HashMap<>();
228 leadersSnapshot.put("1", "A");
229 leadersSnapshot.put("2", "B");
230 leadersSnapshot.put("3", "C");
233 actorContext.getReplicatedLog().removeFrom(0);
235 final int followersLastIndex = 2;
236 final int snapshotIndex = 3;
237 final int newEntryIndex = 4;
238 final int snapshotTerm = 1;
239 final int currentTerm = 2;
241 // set the snapshot variables in replicatedlog
242 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
243 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
244 actorContext.setCommitIndex(followersLastIndex);
245 //set follower timeout to 2 mins, helps during debugging
246 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
248 MockLeader leader = new MockLeader(actorContext);
251 ReplicatedLogImplEntry entry =
252 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
253 new MockRaftActorContext.MockPayload("D"));
255 //update follower timestamp
256 leader.markFollowerActive(followerActor.path().toString());
258 ByteString bs = toByteString(leadersSnapshot);
259 leader.setSnapshot(Optional.of(bs));
260 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
262 //send first chunk and no InstallSnapshotReply received yet
263 leader.getFollowerToSnapshot().getNextChunk();
264 leader.getFollowerToSnapshot().incrementChunkIndex();
266 leader.handleMessage(leaderActor, new SendHeartBeat());
268 AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
269 followerActor, AppendEntries.SERIALIZABLE_CLASS);
271 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
272 "received", aeproto);
274 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
276 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
278 //InstallSnapshotReply received
279 leader.getFollowerToSnapshot().markSendStatus(true);
281 leader.handleMessage(senderActor, new SendHeartBeat());
283 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
284 MessageCollectorActor.getFirstMatching(followerActor,
285 InstallSnapshot.SERIALIZABLE_CLASS);
287 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
290 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
292 assertEquals(snapshotIndex, is.getLastIncludedIndex());
298 public void testSendAppendEntriesSnapshotScenario() {
299 new JavaTestKit(getSystem()) {{
301 ActorRef followerActor = getTestActor();
303 Map<String, String> peerAddresses = new HashMap<>();
304 peerAddresses.put(followerActor.path().toString(),
305 followerActor.path().toString());
307 MockRaftActorContext actorContext =
308 (MockRaftActorContext) createActorContext(getRef());
309 actorContext.setPeerAddresses(peerAddresses);
311 Map<String, String> leadersSnapshot = new HashMap<>();
312 leadersSnapshot.put("1", "A");
313 leadersSnapshot.put("2", "B");
314 leadersSnapshot.put("3", "C");
317 actorContext.getReplicatedLog().removeFrom(0);
319 final int followersLastIndex = 2;
320 final int snapshotIndex = 3;
321 final int newEntryIndex = 4;
322 final int snapshotTerm = 1;
323 final int currentTerm = 2;
325 // set the snapshot variables in replicatedlog
326 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
327 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
328 actorContext.setCommitIndex(followersLastIndex);
330 Leader leader = new Leader(actorContext);
333 ReplicatedLogImplEntry entry =
334 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
335 new MockRaftActorContext.MockPayload("D"));
337 //update follower timestamp
338 leader.markFollowerActive(followerActor.path().toString());
340 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
341 RaftActorBehavior raftBehavior = leader.handleMessage(
342 senderActor, new Replicate(null, "state-id", entry));
344 assertTrue(raftBehavior instanceof Leader);
346 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
347 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
349 protected Boolean match(Object o) throws Exception {
350 if (o instanceof InitiateInstallSnapshot) {
357 boolean initiateInitiateInstallSnapshot = false;
358 for (Boolean b: matches) {
359 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
362 assertTrue(initiateInitiateInstallSnapshot);
367 public void testInitiateInstallSnapshot() throws Exception {
368 new JavaTestKit(getSystem()) {{
370 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
372 ActorRef followerActor = getTestActor();
374 Map<String, String> peerAddresses = new HashMap<>();
375 peerAddresses.put(followerActor.path().toString(),
376 followerActor.path().toString());
379 MockRaftActorContext actorContext =
380 (MockRaftActorContext) createActorContext(leaderActor);
381 actorContext.setPeerAddresses(peerAddresses);
383 Map<String, String> leadersSnapshot = new HashMap<>();
384 leadersSnapshot.put("1", "A");
385 leadersSnapshot.put("2", "B");
386 leadersSnapshot.put("3", "C");
389 actorContext.getReplicatedLog().removeFrom(0);
391 final int followersLastIndex = 2;
392 final int snapshotIndex = 3;
393 final int newEntryIndex = 4;
394 final int snapshotTerm = 1;
395 final int currentTerm = 2;
397 // set the snapshot variables in replicatedlog
398 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
399 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
400 actorContext.setLastApplied(3);
401 actorContext.setCommitIndex(followersLastIndex);
403 Leader leader = new Leader(actorContext);
404 // set the snapshot as absent and check if capture-snapshot is invoked.
405 leader.setSnapshot(Optional.<ByteString>absent());
408 ReplicatedLogImplEntry entry =
409 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
410 new MockRaftActorContext.MockPayload("D"));
412 actorContext.getReplicatedLog().append(entry);
414 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
415 RaftActorBehavior raftBehavior = leader.handleMessage(
416 leaderActor, new InitiateInstallSnapshot());
418 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
419 getFirstMatching(leaderActor, CaptureSnapshot.class);
423 assertTrue(cs.isInstallSnapshotInitiated());
424 assertEquals(3, cs.getLastAppliedIndex());
425 assertEquals(1, cs.getLastAppliedTerm());
426 assertEquals(4, cs.getLastIndex());
427 assertEquals(2, cs.getLastTerm());
432 public void testInstallSnapshot() {
433 new JavaTestKit(getSystem()) {{
435 ActorRef followerActor = getTestActor();
437 Map<String, String> peerAddresses = new HashMap<>();
438 peerAddresses.put(followerActor.path().toString(),
439 followerActor.path().toString());
441 MockRaftActorContext actorContext =
442 (MockRaftActorContext) createActorContext();
443 actorContext.setPeerAddresses(peerAddresses);
446 Map<String, String> leadersSnapshot = new HashMap<>();
447 leadersSnapshot.put("1", "A");
448 leadersSnapshot.put("2", "B");
449 leadersSnapshot.put("3", "C");
452 actorContext.getReplicatedLog().removeFrom(0);
454 final int followersLastIndex = 2;
455 final int snapshotIndex = 3;
456 final int newEntryIndex = 4;
457 final int snapshotTerm = 1;
458 final int currentTerm = 2;
460 // set the snapshot variables in replicatedlog
461 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
462 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
463 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
464 actorContext.setCommitIndex(followersLastIndex);
466 Leader leader = new Leader(actorContext);
469 ReplicatedLogImplEntry entry =
470 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
471 new MockRaftActorContext.MockPayload("D"));
473 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
474 new SendInstallSnapshot(toByteString(leadersSnapshot)));
476 assertTrue(raftBehavior instanceof Leader);
478 // check if installsnapshot gets called with the correct values.
480 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
481 // do not put code outside this method, will run afterwards
482 protected String match(Object in) {
483 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
484 InstallSnapshot is = (InstallSnapshot)
485 SerializationUtils.fromSerializable(in);
486 if (is.getData() == null) {
487 return "InstallSnapshot data is null";
489 if (is.getLastIncludedIndex() != snapshotIndex) {
490 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
492 if (is.getLastIncludedTerm() != snapshotTerm) {
493 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
495 if (is.getTerm() == currentTerm) {
496 return is.getTerm() + "!=" + currentTerm;
502 return "message mismatch:" + in.getClass();
505 }.get(); // this extracts the received message
507 assertEquals("match", out);
512 public void testHandleInstallSnapshotReplyLastChunk() {
513 new JavaTestKit(getSystem()) {{
515 ActorRef followerActor = getTestActor();
517 Map<String, String> peerAddresses = new HashMap<>();
518 peerAddresses.put(followerActor.path().toString(),
519 followerActor.path().toString());
521 final int followersLastIndex = 2;
522 final int snapshotIndex = 3;
523 final int newEntryIndex = 4;
524 final int snapshotTerm = 1;
525 final int currentTerm = 2;
527 MockRaftActorContext actorContext =
528 (MockRaftActorContext) createActorContext();
529 actorContext.setPeerAddresses(peerAddresses);
530 actorContext.setCommitIndex(followersLastIndex);
532 MockLeader leader = new MockLeader(actorContext);
534 Map<String, String> leadersSnapshot = new HashMap<>();
535 leadersSnapshot.put("1", "A");
536 leadersSnapshot.put("2", "B");
537 leadersSnapshot.put("3", "C");
539 // set the snapshot variables in replicatedlog
541 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
542 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
543 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
545 ByteString bs = toByteString(leadersSnapshot);
546 leader.setSnapshot(Optional.of(bs));
547 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
548 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
549 leader.getFollowerToSnapshot().getNextChunk();
550 leader.getFollowerToSnapshot().incrementChunkIndex();
554 actorContext.getReplicatedLog().removeFrom(0);
556 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
557 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
558 leader.getFollowerToSnapshot().getChunkIndex(), true));
560 assertTrue(raftBehavior instanceof Leader);
562 assertEquals(leader.mapFollowerToSnapshot.size(), 0);
563 assertEquals(leader.followerToLog.size(), 1);
564 assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
565 FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
566 assertEquals(snapshotIndex, fli.getMatchIndex().get());
567 assertEquals(snapshotIndex, fli.getMatchIndex().get());
568 assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
573 public void testFollowerToSnapshotLogic() {
575 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
577 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
579 public int getSnapshotChunkSize() {
584 MockLeader leader = new MockLeader(actorContext);
586 Map<String, String> leadersSnapshot = new HashMap<>();
587 leadersSnapshot.put("1", "A");
588 leadersSnapshot.put("2", "B");
589 leadersSnapshot.put("3", "C");
591 ByteString bs = toByteString(leadersSnapshot);
592 byte[] barray = bs.toByteArray();
594 leader.createFollowerToSnapshot("followerId", bs);
595 assertEquals(bs.size(), barray.length);
598 for (int i=0; i < barray.length; i = i + 50) {
602 if (i + 50 > barray.length) {
606 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
607 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
608 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
610 leader.getFollowerToSnapshot().markSendStatus(true);
611 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
612 leader.getFollowerToSnapshot().incrementChunkIndex();
616 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
620 @Override protected RaftActorBehavior createBehavior(
621 RaftActorContext actorContext) {
622 return new Leader(actorContext);
625 @Override protected RaftActorContext createActorContext() {
626 return createActorContext(leaderActor);
629 protected RaftActorContext createActorContext(ActorRef actorRef) {
630 return new MockRaftActorContext("test", getSystem(), actorRef);
633 private ByteString toByteString(Map<String, String> state) {
634 ByteArrayOutputStream b = null;
635 ObjectOutputStream o = null;
638 b = new ByteArrayOutputStream();
639 o = new ObjectOutputStream(b);
640 o.writeObject(state);
641 byte[] snapshotBytes = b.toByteArray();
642 return ByteString.copyFrom(snapshotBytes);
652 } catch (IOException e) {
653 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
658 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
659 private static AbstractRaftActorBehavior behavior;
661 public ForwardMessageToBehaviorActor(){
665 @Override public void onReceive(Object message) throws Exception {
666 super.onReceive(message);
667 behavior.handleMessage(sender(), message);
670 public static void setBehavior(AbstractRaftActorBehavior behavior){
671 ForwardMessageToBehaviorActor.behavior = behavior;
676 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
677 new JavaTestKit(getSystem()) {{
679 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
681 MockRaftActorContext leaderActorContext =
682 new MockRaftActorContext("leader", getSystem(), leaderActor);
684 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
686 MockRaftActorContext followerActorContext =
687 new MockRaftActorContext("follower", getSystem(), followerActor);
689 Follower follower = new Follower(followerActorContext);
691 ForwardMessageToBehaviorActor.setBehavior(follower);
693 Map<String, String> peerAddresses = new HashMap<>();
694 peerAddresses.put(followerActor.path().toString(),
695 followerActor.path().toString());
697 leaderActorContext.setPeerAddresses(peerAddresses);
699 leaderActorContext.getReplicatedLog().removeFrom(0);
702 leaderActorContext.setReplicatedLog(
703 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
705 leaderActorContext.setCommitIndex(1);
707 followerActorContext.getReplicatedLog().removeFrom(0);
709 // follower too has the exact same log entries and has the same commit index
710 followerActorContext.setReplicatedLog(
711 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
713 followerActorContext.setCommitIndex(1);
715 Leader leader = new Leader(leaderActorContext);
716 leader.markFollowerActive(followerActor.path().toString());
718 leader.handleMessage(leaderActor, new SendHeartBeat());
720 AppendEntriesMessages.AppendEntries appendEntries =
721 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
722 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
724 assertNotNull(appendEntries);
726 assertEquals(1, appendEntries.getLeaderCommit());
727 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
728 assertEquals(0, appendEntries.getPrevLogIndex());
730 AppendEntriesReply appendEntriesReply =
731 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
732 leaderActor, AppendEntriesReply.class);
734 assertNotNull(appendEntriesReply);
736 // follower returns its next index
737 assertEquals(2, appendEntriesReply.getLogLastIndex());
738 assertEquals(1, appendEntriesReply.getLogLastTerm());
745 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
746 new JavaTestKit(getSystem()) {{
748 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
750 MockRaftActorContext leaderActorContext =
751 new MockRaftActorContext("leader", getSystem(), leaderActor);
753 ActorRef followerActor = getSystem().actorOf(
754 Props.create(ForwardMessageToBehaviorActor.class));
756 MockRaftActorContext followerActorContext =
757 new MockRaftActorContext("follower", getSystem(), followerActor);
759 Follower follower = new Follower(followerActorContext);
761 ForwardMessageToBehaviorActor.setBehavior(follower);
763 Map<String, String> peerAddresses = new HashMap<>();
764 peerAddresses.put(followerActor.path().toString(),
765 followerActor.path().toString());
767 leaderActorContext.setPeerAddresses(peerAddresses);
769 leaderActorContext.getReplicatedLog().removeFrom(0);
771 leaderActorContext.setReplicatedLog(
772 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
774 leaderActorContext.setCommitIndex(1);
776 followerActorContext.getReplicatedLog().removeFrom(0);
778 followerActorContext.setReplicatedLog(
779 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
781 // follower has the same log entries but its commit index > leaders commit index
782 followerActorContext.setCommitIndex(2);
784 Leader leader = new Leader(leaderActorContext);
785 leader.markFollowerActive(followerActor.path().toString());
787 leader.handleMessage(leaderActor, new SendHeartBeat());
789 AppendEntriesMessages.AppendEntries appendEntries =
790 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
791 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
793 assertNotNull(appendEntries);
795 assertEquals(1, appendEntries.getLeaderCommit());
796 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
797 assertEquals(0, appendEntries.getPrevLogIndex());
799 AppendEntriesReply appendEntriesReply =
800 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
801 leaderActor, AppendEntriesReply.class);
803 assertNotNull(appendEntriesReply);
805 assertEquals(2, appendEntriesReply.getLogLastIndex());
806 assertEquals(1, appendEntriesReply.getLogLastTerm());
812 public void testHandleAppendEntriesReplyFailure(){
813 new JavaTestKit(getSystem()) {
816 ActorRef leaderActor =
817 getSystem().actorOf(Props.create(MessageCollectorActor.class));
819 ActorRef followerActor =
820 getSystem().actorOf(Props.create(MessageCollectorActor.class));
823 MockRaftActorContext leaderActorContext =
824 new MockRaftActorContext("leader", getSystem(), leaderActor);
826 Map<String, String> peerAddresses = new HashMap<>();
827 peerAddresses.put("follower-1",
828 followerActor.path().toString());
830 leaderActorContext.setPeerAddresses(peerAddresses);
832 Leader leader = new Leader(leaderActorContext);
834 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
836 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
838 assertEquals(RaftState.Leader, raftActorBehavior.state());
844 public void testHandleAppendEntriesReplySuccess() throws Exception {
845 new JavaTestKit(getSystem()) {
848 ActorRef leaderActor =
849 getSystem().actorOf(Props.create(MessageCollectorActor.class));
851 ActorRef followerActor =
852 getSystem().actorOf(Props.create(MessageCollectorActor.class));
855 MockRaftActorContext leaderActorContext =
856 new MockRaftActorContext("leader", getSystem(), leaderActor);
858 leaderActorContext.setReplicatedLog(
859 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
861 Map<String, String> peerAddresses = new HashMap<>();
862 peerAddresses.put("follower-1",
863 followerActor.path().toString());
865 leaderActorContext.setPeerAddresses(peerAddresses);
866 leaderActorContext.setCommitIndex(1);
867 leaderActorContext.setLastApplied(1);
868 leaderActorContext.getTermInformation().update(1, "leader");
870 Leader leader = new Leader(leaderActorContext);
872 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
874 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
876 assertEquals(RaftState.Leader, raftActorBehavior.state());
878 assertEquals(2, leaderActorContext.getCommitIndex());
880 ApplyLogEntries applyLogEntries =
881 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
882 ApplyLogEntries.class);
884 assertNotNull(applyLogEntries);
886 assertEquals(2, leaderActorContext.getLastApplied());
888 assertEquals(2, applyLogEntries.getToIndex());
890 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
893 assertEquals(1,applyStateList.size());
895 ApplyState applyState = (ApplyState) applyStateList.get(0);
897 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
903 public void testHandleAppendEntriesReplyUnknownFollower(){
904 new JavaTestKit(getSystem()) {
907 ActorRef leaderActor =
908 getSystem().actorOf(Props.create(MessageCollectorActor.class));
910 MockRaftActorContext leaderActorContext =
911 new MockRaftActorContext("leader", getSystem(), leaderActor);
913 Leader leader = new Leader(leaderActorContext);
915 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
917 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
919 assertEquals(RaftState.Leader, raftActorBehavior.state());
925 public void testHandleRequestVoteReply(){
926 new JavaTestKit(getSystem()) {
929 ActorRef leaderActor =
930 getSystem().actorOf(Props.create(MessageCollectorActor.class));
932 MockRaftActorContext leaderActorContext =
933 new MockRaftActorContext("leader", getSystem(), leaderActor);
935 Leader leader = new Leader(leaderActorContext);
937 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
939 assertEquals(RaftState.Leader, raftActorBehavior.state());
941 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
943 assertEquals(RaftState.Leader, raftActorBehavior.state());
950 class MockLeader extends Leader {
952 FollowerToSnapshot fts;
954 public MockLeader(RaftActorContext context){
958 public FollowerToSnapshot getFollowerToSnapshot() {
962 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
963 fts = new FollowerToSnapshot(bs);
964 mapFollowerToSnapshot.put(followerId, fts);
969 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
971 private long electionTimeOutIntervalMillis;
972 private int snapshotChunkSize;
974 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
976 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
977 this.snapshotChunkSize = snapshotChunkSize;
981 public FiniteDuration getElectionTimeOutInterval() {
982 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
986 public int getSnapshotChunkSize() {
987 return snapshotChunkSize;