1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.actor.Terminated;
7 import akka.testkit.JavaTestKit;
8 import com.google.common.base.Optional;
9 import com.google.common.util.concurrent.Uninterruptibles;
10 import com.google.protobuf.ByteString;
11 import java.io.ByteArrayOutputStream;
12 import java.io.IOException;
13 import java.io.ObjectOutputStream;
14 import java.util.HashMap;
15 import java.util.List;
17 import java.util.concurrent.TimeUnit;
18 import org.junit.Assert;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
21 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
22 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
23 import org.opendaylight.controller.cluster.raft.RaftActorContext;
24 import org.opendaylight.controller.cluster.raft.RaftState;
25 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
26 import org.opendaylight.controller.cluster.raft.SerializationUtils;
27 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
29 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
32 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
33 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
34 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
36 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
37 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
39 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
40 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
41 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
42 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
43 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
44 import scala.concurrent.duration.FiniteDuration;
45 import static org.junit.Assert.assertEquals;
46 import static org.junit.Assert.assertNotNull;
47 import static org.junit.Assert.assertTrue;
49 public class LeaderTest extends AbstractRaftActorBehaviorTest {
51 private ActorRef leaderActor =
52 getSystem().actorOf(Props.create(DoNothingActor.class));
53 private ActorRef senderActor =
54 getSystem().actorOf(Props.create(DoNothingActor.class));
57 public void testHandleMessageForUnknownMessage() throws Exception {
58 new JavaTestKit(getSystem()) {{
60 new Leader(createActorContext());
62 // handle message should return the Leader state when it receives an
64 RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
65 Assert.assertTrue(behavior instanceof Leader);
71 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
72 new JavaTestKit(getSystem()) {{
74 new Within(duration("1 seconds")) {
75 protected void run() {
77 ActorRef followerActor = getTestActor();
79 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
81 Map<String, String> peerAddresses = new HashMap<>();
83 peerAddresses.put(followerActor.path().toString(),
84 followerActor.path().toString());
86 actorContext.setPeerAddresses(peerAddresses);
88 Leader leader = new Leader(actorContext);
89 leader.handleMessage(senderActor, new SendHeartBeat());
92 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
93 // do not put code outside this method, will run afterwards
94 protected String match(Object in) {
95 Object msg = fromSerializableMessage(in);
96 if (msg instanceof AppendEntries) {
97 if (((AppendEntries)msg).getTerm() == 0) {
105 }.get(); // this extracts the received message
107 assertEquals("match", out);
115 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
116 new JavaTestKit(getSystem()) {{
118 new Within(duration("1 seconds")) {
119 protected void run() {
121 ActorRef followerActor = getTestActor();
123 MockRaftActorContext actorContext =
124 (MockRaftActorContext) createActorContext();
126 Map<String, String> peerAddresses = new HashMap<>();
128 peerAddresses.put(followerActor.path().toString(),
129 followerActor.path().toString());
131 actorContext.setPeerAddresses(peerAddresses);
133 Leader leader = new Leader(actorContext);
134 RaftActorBehavior raftBehavior = leader
135 .handleMessage(senderActor, new Replicate(null, null,
136 new MockRaftActorContext.MockReplicatedLogEntry(1,
138 new MockRaftActorContext.MockPayload("foo"))
141 // State should not change
142 assertTrue(raftBehavior instanceof Leader);
145 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
146 // do not put code outside this method, will run afterwards
147 protected String match(Object in) {
148 Object msg = fromSerializableMessage(in);
149 if (msg instanceof AppendEntries) {
150 if (((AppendEntries)msg).getTerm() == 0) {
158 }.get(); // this extracts the received message
160 assertEquals("match", out);
167 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
168 new JavaTestKit(getSystem()) {{
170 new Within(duration("1 seconds")) {
171 protected void run() {
173 ActorRef raftActor = getTestActor();
175 MockRaftActorContext actorContext =
176 new MockRaftActorContext("test", getSystem(), raftActor);
178 actorContext.getReplicatedLog().removeFrom(0);
180 actorContext.setReplicatedLog(
181 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
184 Leader leader = new Leader(actorContext);
185 RaftActorBehavior raftBehavior = leader
186 .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
188 // State should not change
189 assertTrue(raftBehavior instanceof Leader);
191 assertEquals(1, actorContext.getCommitIndex());
194 new ExpectMsg<String>(duration("1 seconds"),
196 // do not put code outside this method, will run afterwards
197 protected String match(Object in) {
198 if (in instanceof ApplyState) {
199 if (((ApplyState) in).getIdentifier().equals("state-id")) {
207 }.get(); // this extracts the received message
209 assertEquals("match", out);
217 public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
218 new JavaTestKit(getSystem()) {{
219 ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
221 Map<String, String> peerAddresses = new HashMap<>();
222 peerAddresses.put(followerActor.path().toString(),
223 followerActor.path().toString());
225 MockRaftActorContext actorContext =
226 (MockRaftActorContext) createActorContext(leaderActor);
227 actorContext.setPeerAddresses(peerAddresses);
229 Map<String, String> leadersSnapshot = new HashMap<>();
230 leadersSnapshot.put("1", "A");
231 leadersSnapshot.put("2", "B");
232 leadersSnapshot.put("3", "C");
235 actorContext.getReplicatedLog().removeFrom(0);
237 final int followersLastIndex = 2;
238 final int snapshotIndex = 3;
239 final int newEntryIndex = 4;
240 final int snapshotTerm = 1;
241 final int currentTerm = 2;
243 // set the snapshot variables in replicatedlog
244 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
245 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
246 actorContext.setCommitIndex(followersLastIndex);
247 //set follower timeout to 2 mins, helps during debugging
248 actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
250 MockLeader leader = new MockLeader(actorContext);
253 ReplicatedLogImplEntry entry =
254 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
255 new MockRaftActorContext.MockPayload("D"));
257 //update follower timestamp
258 leader.markFollowerActive(followerActor.path().toString());
260 ByteString bs = toByteString(leadersSnapshot);
261 leader.setSnapshot(Optional.of(bs));
262 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
264 //send first chunk and no InstallSnapshotReply received yet
265 leader.getFollowerToSnapshot().getNextChunk();
266 leader.getFollowerToSnapshot().incrementChunkIndex();
268 leader.handleMessage(leaderActor, new SendHeartBeat());
270 AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
271 followerActor, AppendEntries.SERIALIZABLE_CLASS);
273 assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
274 "received", aeproto);
276 AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
278 assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
280 //InstallSnapshotReply received
281 leader.getFollowerToSnapshot().markSendStatus(true);
283 leader.handleMessage(senderActor, new SendHeartBeat());
285 InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
286 MessageCollectorActor.getFirstMatching(followerActor,
287 InstallSnapshot.SERIALIZABLE_CLASS);
289 assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
292 InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
294 assertEquals(snapshotIndex, is.getLastIncludedIndex());
300 public void testSendAppendEntriesSnapshotScenario() {
301 new JavaTestKit(getSystem()) {{
303 ActorRef followerActor = getTestActor();
305 Map<String, String> peerAddresses = new HashMap<>();
306 peerAddresses.put(followerActor.path().toString(),
307 followerActor.path().toString());
309 MockRaftActorContext actorContext =
310 (MockRaftActorContext) createActorContext(getRef());
311 actorContext.setPeerAddresses(peerAddresses);
313 Map<String, String> leadersSnapshot = new HashMap<>();
314 leadersSnapshot.put("1", "A");
315 leadersSnapshot.put("2", "B");
316 leadersSnapshot.put("3", "C");
319 actorContext.getReplicatedLog().removeFrom(0);
321 final int followersLastIndex = 2;
322 final int snapshotIndex = 3;
323 final int newEntryIndex = 4;
324 final int snapshotTerm = 1;
325 final int currentTerm = 2;
327 // set the snapshot variables in replicatedlog
328 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
329 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
330 actorContext.setCommitIndex(followersLastIndex);
332 Leader leader = new Leader(actorContext);
335 ReplicatedLogImplEntry entry =
336 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
337 new MockRaftActorContext.MockPayload("D"));
339 //update follower timestamp
340 leader.markFollowerActive(followerActor.path().toString());
342 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
343 RaftActorBehavior raftBehavior = leader.handleMessage(
344 senderActor, new Replicate(null, "state-id", entry));
346 assertTrue(raftBehavior instanceof Leader);
348 // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
349 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
351 protected Boolean match(Object o) throws Exception {
352 if (o instanceof InitiateInstallSnapshot) {
359 boolean initiateInitiateInstallSnapshot = false;
360 for (Boolean b: matches) {
361 initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
364 assertTrue(initiateInitiateInstallSnapshot);
369 public void testInitiateInstallSnapshot() throws Exception {
370 new JavaTestKit(getSystem()) {{
372 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
374 ActorRef followerActor = getTestActor();
376 Map<String, String> peerAddresses = new HashMap<>();
377 peerAddresses.put(followerActor.path().toString(),
378 followerActor.path().toString());
381 MockRaftActorContext actorContext =
382 (MockRaftActorContext) createActorContext(leaderActor);
383 actorContext.setPeerAddresses(peerAddresses);
385 Map<String, String> leadersSnapshot = new HashMap<>();
386 leadersSnapshot.put("1", "A");
387 leadersSnapshot.put("2", "B");
388 leadersSnapshot.put("3", "C");
391 actorContext.getReplicatedLog().removeFrom(0);
393 final int followersLastIndex = 2;
394 final int snapshotIndex = 3;
395 final int newEntryIndex = 4;
396 final int snapshotTerm = 1;
397 final int currentTerm = 2;
399 // set the snapshot variables in replicatedlog
400 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
401 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
402 actorContext.setLastApplied(3);
403 actorContext.setCommitIndex(followersLastIndex);
405 Leader leader = new Leader(actorContext);
406 // set the snapshot as absent and check if capture-snapshot is invoked.
407 leader.setSnapshot(Optional.<ByteString>absent());
410 ReplicatedLogImplEntry entry =
411 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
412 new MockRaftActorContext.MockPayload("D"));
414 actorContext.getReplicatedLog().append(entry);
416 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
417 RaftActorBehavior raftBehavior = leader.handleMessage(
418 leaderActor, new InitiateInstallSnapshot());
420 CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
421 getFirstMatching(leaderActor, CaptureSnapshot.class);
425 assertTrue(cs.isInstallSnapshotInitiated());
426 assertEquals(3, cs.getLastAppliedIndex());
427 assertEquals(1, cs.getLastAppliedTerm());
428 assertEquals(4, cs.getLastIndex());
429 assertEquals(2, cs.getLastTerm());
434 public void testInstallSnapshot() {
435 new JavaTestKit(getSystem()) {{
437 ActorRef followerActor = getTestActor();
439 Map<String, String> peerAddresses = new HashMap<>();
440 peerAddresses.put(followerActor.path().toString(),
441 followerActor.path().toString());
443 MockRaftActorContext actorContext =
444 (MockRaftActorContext) createActorContext();
445 actorContext.setPeerAddresses(peerAddresses);
448 Map<String, String> leadersSnapshot = new HashMap<>();
449 leadersSnapshot.put("1", "A");
450 leadersSnapshot.put("2", "B");
451 leadersSnapshot.put("3", "C");
454 actorContext.getReplicatedLog().removeFrom(0);
456 final int followersLastIndex = 2;
457 final int snapshotIndex = 3;
458 final int newEntryIndex = 4;
459 final int snapshotTerm = 1;
460 final int currentTerm = 2;
462 // set the snapshot variables in replicatedlog
463 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
464 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
465 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
466 actorContext.setCommitIndex(followersLastIndex);
468 Leader leader = new Leader(actorContext);
471 ReplicatedLogImplEntry entry =
472 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
473 new MockRaftActorContext.MockPayload("D"));
475 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
476 new SendInstallSnapshot(toByteString(leadersSnapshot)));
478 assertTrue(raftBehavior instanceof Leader);
480 // check if installsnapshot gets called with the correct values.
482 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
483 // do not put code outside this method, will run afterwards
484 protected String match(Object in) {
485 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
486 InstallSnapshot is = (InstallSnapshot)
487 SerializationUtils.fromSerializable(in);
488 if (is.getData() == null) {
489 return "InstallSnapshot data is null";
491 if (is.getLastIncludedIndex() != snapshotIndex) {
492 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
494 if (is.getLastIncludedTerm() != snapshotTerm) {
495 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
497 if (is.getTerm() == currentTerm) {
498 return is.getTerm() + "!=" + currentTerm;
504 return "message mismatch:" + in.getClass();
507 }.get(); // this extracts the received message
509 assertEquals("match", out);
514 public void testHandleInstallSnapshotReplyLastChunk() {
515 new JavaTestKit(getSystem()) {{
517 ActorRef followerActor = getTestActor();
519 Map<String, String> peerAddresses = new HashMap<>();
520 peerAddresses.put(followerActor.path().toString(),
521 followerActor.path().toString());
523 final int followersLastIndex = 2;
524 final int snapshotIndex = 3;
525 final int newEntryIndex = 4;
526 final int snapshotTerm = 1;
527 final int currentTerm = 2;
529 MockRaftActorContext actorContext =
530 (MockRaftActorContext) createActorContext();
531 actorContext.setPeerAddresses(peerAddresses);
532 actorContext.setCommitIndex(followersLastIndex);
534 MockLeader leader = new MockLeader(actorContext);
536 Map<String, String> leadersSnapshot = new HashMap<>();
537 leadersSnapshot.put("1", "A");
538 leadersSnapshot.put("2", "B");
539 leadersSnapshot.put("3", "C");
541 // set the snapshot variables in replicatedlog
543 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
544 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
545 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
547 ByteString bs = toByteString(leadersSnapshot);
548 leader.setSnapshot(Optional.of(bs));
549 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
550 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
551 leader.getFollowerToSnapshot().getNextChunk();
552 leader.getFollowerToSnapshot().incrementChunkIndex();
556 actorContext.getReplicatedLog().removeFrom(0);
558 RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
559 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
560 leader.getFollowerToSnapshot().getChunkIndex(), true));
562 assertTrue(raftBehavior instanceof Leader);
564 assertEquals(leader.mapFollowerToSnapshot.size(), 0);
565 assertEquals(leader.followerToLog.size(), 1);
566 assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
567 FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
568 assertEquals(snapshotIndex, fli.getMatchIndex().get());
569 assertEquals(snapshotIndex, fli.getMatchIndex().get());
570 assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
575 public void testFollowerToSnapshotLogic() {
577 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
579 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
581 public int getSnapshotChunkSize() {
586 MockLeader leader = new MockLeader(actorContext);
588 Map<String, String> leadersSnapshot = new HashMap<>();
589 leadersSnapshot.put("1", "A");
590 leadersSnapshot.put("2", "B");
591 leadersSnapshot.put("3", "C");
593 ByteString bs = toByteString(leadersSnapshot);
594 byte[] barray = bs.toByteArray();
596 leader.createFollowerToSnapshot("followerId", bs);
597 assertEquals(bs.size(), barray.length);
600 for (int i=0; i < barray.length; i = i + 50) {
604 if (i + 50 > barray.length) {
608 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
609 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
610 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
612 leader.getFollowerToSnapshot().markSendStatus(true);
613 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
614 leader.getFollowerToSnapshot().incrementChunkIndex();
618 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
622 @Override protected RaftActorBehavior createBehavior(
623 RaftActorContext actorContext) {
624 return new Leader(actorContext);
627 @Override protected RaftActorContext createActorContext() {
628 return createActorContext(leaderActor);
631 protected RaftActorContext createActorContext(ActorRef actorRef) {
632 return new MockRaftActorContext("test", getSystem(), actorRef);
635 private ByteString toByteString(Map<String, String> state) {
636 ByteArrayOutputStream b = null;
637 ObjectOutputStream o = null;
640 b = new ByteArrayOutputStream();
641 o = new ObjectOutputStream(b);
642 o.writeObject(state);
643 byte[] snapshotBytes = b.toByteArray();
644 return ByteString.copyFrom(snapshotBytes);
654 } catch (IOException e) {
655 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
660 public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
661 private static AbstractRaftActorBehavior behavior;
663 public ForwardMessageToBehaviorActor(){
667 @Override public void onReceive(Object message) throws Exception {
668 super.onReceive(message);
669 behavior.handleMessage(sender(), message);
672 public static void setBehavior(AbstractRaftActorBehavior behavior){
673 ForwardMessageToBehaviorActor.behavior = behavior;
678 public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
679 new JavaTestKit(getSystem()) {{
681 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
683 MockRaftActorContext leaderActorContext =
684 new MockRaftActorContext("leader", getSystem(), leaderActor);
686 ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
688 MockRaftActorContext followerActorContext =
689 new MockRaftActorContext("follower", getSystem(), followerActor);
691 Follower follower = new Follower(followerActorContext);
693 ForwardMessageToBehaviorActor.setBehavior(follower);
695 Map<String, String> peerAddresses = new HashMap<>();
696 peerAddresses.put(followerActor.path().toString(),
697 followerActor.path().toString());
699 leaderActorContext.setPeerAddresses(peerAddresses);
701 leaderActorContext.getReplicatedLog().removeFrom(0);
704 leaderActorContext.setReplicatedLog(
705 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
707 leaderActorContext.setCommitIndex(1);
709 followerActorContext.getReplicatedLog().removeFrom(0);
711 // follower too has the exact same log entries and has the same commit index
712 followerActorContext.setReplicatedLog(
713 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
715 followerActorContext.setCommitIndex(1);
717 Leader leader = new Leader(leaderActorContext);
718 leader.markFollowerActive(followerActor.path().toString());
720 leader.handleMessage(leaderActor, new SendHeartBeat());
722 AppendEntriesMessages.AppendEntries appendEntries =
723 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
724 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
726 assertNotNull(appendEntries);
728 assertEquals(1, appendEntries.getLeaderCommit());
729 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
730 assertEquals(0, appendEntries.getPrevLogIndex());
732 AppendEntriesReply appendEntriesReply =
733 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
734 leaderActor, AppendEntriesReply.class);
736 assertNotNull(appendEntriesReply);
738 // follower returns its next index
739 assertEquals(2, appendEntriesReply.getLogLastIndex());
740 assertEquals(1, appendEntriesReply.getLogLastTerm());
747 public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
748 new JavaTestKit(getSystem()) {{
750 ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
752 MockRaftActorContext leaderActorContext =
753 new MockRaftActorContext("leader", getSystem(), leaderActor);
755 ActorRef followerActor = getSystem().actorOf(
756 Props.create(ForwardMessageToBehaviorActor.class));
758 MockRaftActorContext followerActorContext =
759 new MockRaftActorContext("follower", getSystem(), followerActor);
761 Follower follower = new Follower(followerActorContext);
763 ForwardMessageToBehaviorActor.setBehavior(follower);
765 Map<String, String> peerAddresses = new HashMap<>();
766 peerAddresses.put(followerActor.path().toString(),
767 followerActor.path().toString());
769 leaderActorContext.setPeerAddresses(peerAddresses);
771 leaderActorContext.getReplicatedLog().removeFrom(0);
773 leaderActorContext.setReplicatedLog(
774 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
776 leaderActorContext.setCommitIndex(1);
778 followerActorContext.getReplicatedLog().removeFrom(0);
780 followerActorContext.setReplicatedLog(
781 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
783 // follower has the same log entries but its commit index > leaders commit index
784 followerActorContext.setCommitIndex(2);
786 Leader leader = new Leader(leaderActorContext);
787 leader.markFollowerActive(followerActor.path().toString());
789 leader.handleMessage(leaderActor, new SendHeartBeat());
791 AppendEntriesMessages.AppendEntries appendEntries =
792 (AppendEntriesMessages.AppendEntries) MessageCollectorActor
793 .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
795 assertNotNull(appendEntries);
797 assertEquals(1, appendEntries.getLeaderCommit());
798 assertEquals(1, appendEntries.getLogEntries(0).getIndex());
799 assertEquals(0, appendEntries.getPrevLogIndex());
801 AppendEntriesReply appendEntriesReply =
802 (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
803 leaderActor, AppendEntriesReply.class);
805 assertNotNull(appendEntriesReply);
807 assertEquals(2, appendEntriesReply.getLogLastIndex());
808 assertEquals(1, appendEntriesReply.getLogLastTerm());
814 public void testHandleAppendEntriesReplyFailure(){
815 new JavaTestKit(getSystem()) {
818 ActorRef leaderActor =
819 getSystem().actorOf(Props.create(MessageCollectorActor.class));
821 ActorRef followerActor =
822 getSystem().actorOf(Props.create(MessageCollectorActor.class));
825 MockRaftActorContext leaderActorContext =
826 new MockRaftActorContext("leader", getSystem(), leaderActor);
828 Map<String, String> peerAddresses = new HashMap<>();
829 peerAddresses.put("follower-1",
830 followerActor.path().toString());
832 leaderActorContext.setPeerAddresses(peerAddresses);
834 Leader leader = new Leader(leaderActorContext);
836 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
838 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
840 assertEquals(RaftState.Leader, raftActorBehavior.state());
846 public void testHandleAppendEntriesReplySuccess() throws Exception {
847 new JavaTestKit(getSystem()) {
850 ActorRef leaderActor =
851 getSystem().actorOf(Props.create(MessageCollectorActor.class));
853 ActorRef followerActor =
854 getSystem().actorOf(Props.create(MessageCollectorActor.class));
857 MockRaftActorContext leaderActorContext =
858 new MockRaftActorContext("leader", getSystem(), leaderActor);
860 leaderActorContext.setReplicatedLog(
861 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
863 Map<String, String> peerAddresses = new HashMap<>();
864 peerAddresses.put("follower-1",
865 followerActor.path().toString());
867 leaderActorContext.setPeerAddresses(peerAddresses);
868 leaderActorContext.setCommitIndex(1);
869 leaderActorContext.setLastApplied(1);
870 leaderActorContext.getTermInformation().update(1, "leader");
872 Leader leader = new Leader(leaderActorContext);
874 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
876 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
878 assertEquals(RaftState.Leader, raftActorBehavior.state());
880 assertEquals(2, leaderActorContext.getCommitIndex());
882 ApplyLogEntries applyLogEntries =
883 (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
884 ApplyLogEntries.class);
886 assertNotNull(applyLogEntries);
888 assertEquals(2, leaderActorContext.getLastApplied());
890 assertEquals(2, applyLogEntries.getToIndex());
892 List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
895 assertEquals(1,applyStateList.size());
897 ApplyState applyState = (ApplyState) applyStateList.get(0);
899 assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
905 public void testHandleAppendEntriesReplyUnknownFollower(){
906 new JavaTestKit(getSystem()) {
909 ActorRef leaderActor =
910 getSystem().actorOf(Props.create(MessageCollectorActor.class));
912 MockRaftActorContext leaderActorContext =
913 new MockRaftActorContext("leader", getSystem(), leaderActor);
915 Leader leader = new Leader(leaderActorContext);
917 AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
919 RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
921 assertEquals(RaftState.Leader, raftActorBehavior.state());
927 public void testHandleRequestVoteReply(){
928 new JavaTestKit(getSystem()) {
931 ActorRef leaderActor =
932 getSystem().actorOf(Props.create(MessageCollectorActor.class));
934 MockRaftActorContext leaderActorContext =
935 new MockRaftActorContext("leader", getSystem(), leaderActor);
937 Leader leader = new Leader(leaderActorContext);
939 RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
941 assertEquals(RaftState.Leader, raftActorBehavior.state());
943 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
945 assertEquals(RaftState.Leader, raftActorBehavior.state());
950 public void testIsolatedLeaderCheckNoFollowers() {
951 new JavaTestKit(getSystem()) {{
952 ActorRef leaderActor = getTestActor();
954 MockRaftActorContext leaderActorContext =
955 new MockRaftActorContext("leader", getSystem(), leaderActor);
957 Map<String, String> peerAddresses = new HashMap<>();
958 leaderActorContext.setPeerAddresses(peerAddresses);
960 Leader leader = new Leader(leaderActorContext);
961 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
962 Assert.assertTrue(behavior instanceof Leader);
967 public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
968 new JavaTestKit(getSystem()) {{
970 ActorRef followerActor1 = getTestActor();
971 ActorRef followerActor2 = getTestActor();
973 MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
975 Map<String, String> peerAddresses = new HashMap<>();
976 peerAddresses.put("follower-1", followerActor1.path().toString());
977 peerAddresses.put("follower-2", followerActor2.path().toString());
979 leaderActorContext.setPeerAddresses(peerAddresses);
981 Leader leader = new Leader(leaderActorContext);
982 leader.stopIsolatedLeaderCheckSchedule();
984 leader.markFollowerActive("follower-1");
985 leader.markFollowerActive("follower-2");
986 RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
987 Assert.assertTrue("Behavior not instance of Leader when all followers are active",
988 behavior instanceof Leader);
990 // kill 1 follower and verify if that got killed
991 final JavaTestKit probe = new JavaTestKit(getSystem());
992 probe.watch(followerActor1);
993 followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
994 final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
995 assertEquals(termMsg1.getActor(), followerActor1);
997 //sleep enough for all the follower stopwatches to lapse
998 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
999 getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
1001 leader.markFollowerActive("follower-2");
1002 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1003 Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1004 behavior instanceof Leader);
1006 // kill 2nd follower and leader should change to Isolated leader
1007 followerActor2.tell(PoisonPill.getInstance(), null);
1008 probe.watch(followerActor2);
1009 followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1010 final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1011 assertEquals(termMsg2.getActor(), followerActor2);
1013 //sleep enough for the remaining the follower stopwatches to lapse
1014 Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
1015 getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
1017 behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1018 Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1019 behavior instanceof IsolatedLeader);
1024 class MockLeader extends Leader {
1026 FollowerToSnapshot fts;
1028 public MockLeader(RaftActorContext context){
1032 public FollowerToSnapshot getFollowerToSnapshot() {
1036 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
1037 fts = new FollowerToSnapshot(bs);
1038 mapFollowerToSnapshot.put(followerId, fts);
1043 private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1045 private long electionTimeOutIntervalMillis;
1046 private int snapshotChunkSize;
1048 public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1050 this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1051 this.snapshotChunkSize = snapshotChunkSize;
1055 public FiniteDuration getElectionTimeOutInterval() {
1056 return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1060 public int getSnapshotChunkSize() {
1061 return snapshotChunkSize;