import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import org.slf4j.impl.SimpleLogger;
import scala.concurrent.duration.FiniteDuration;
public class LeaderTest extends AbstractRaftActorBehaviorTest {
- private ActorRef leaderActor =
+ static {
+ // This enables trace logging for the tests.
+ System.setProperty(SimpleLogger.LOG_KEY_PREFIX + MockRaftActorContext.class.getName(), "trace");
+ }
+
+ private final ActorRef leaderActor =
getSystem().actorOf(Props.create(DoNothingActor.class));
- private ActorRef senderActor =
+ private final ActorRef senderActor =
getSystem().actorOf(Props.create(DoNothingActor.class));
@Test
@Test
public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
new JavaTestKit(getSystem()) {{
-
new Within(duration("1 seconds")) {
@Override
protected void run() {
-
ActorRef followerActor = getTestActor();
MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ String followerId = "follower";
+ peerAddresses.put(followerId, followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
Leader leader = new Leader(actorContext);
- leader.handleMessage(senderActor, new SendHeartBeat());
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- Object msg = fromSerializableMessage(in);
- if (msg instanceof AppendEntries) {
- if (((AppendEntries)msg).getTerm() == 0) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ // Leader should send an immediate heartbeat with no entries as follower is inactive.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ assertEquals("getTerm", term, appendEntries.getTerm());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
- assertEquals("match", out);
+ // The follower would normally reply - simulate that explicitly here.
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ followerId, term, true, lastIndex - 1, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+ getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(senderActor, new SendHeartBeat());
+ appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
}
};
}};
@Test
public void testHandleReplicateMessageSendAppendEntriesToFollower() {
new JavaTestKit(getSystem()) {{
-
new Within(duration("1 seconds")) {
@Override
protected void run() {
-
ActorRef followerActor = getTestActor();
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
+ MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ String followerId = "follower";
+ peerAddresses.put(followerId, followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
Leader leader = new Leader(actorContext);
- RaftActorBehavior raftBehavior = leader
- .handleMessage(senderActor, new Replicate(null, null,
- new MockRaftActorContext.MockReplicatedLogEntry(1,
- 100,
- new MockRaftActorContext.MockPayload("foo"))
- ));
+
+ // Leader will send an immediate heartbeat - ignore it.
+ expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ followerId, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 1, lastIndex + 1, payload);
+ actorContext.getReplicatedLog().append(newEntry);
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+ new Replicate(null, null, newEntry));
// State should not change
assertTrue(raftBehavior instanceof Leader);
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- Object msg = fromSerializableMessage(in);
- if (msg instanceof AppendEntries) {
- if (((AppendEntries)msg).getTerm() == 0) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
+ AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
}
};
}};
@Test
public void testHandleReplicateMessageWhenThereAreNoFollowers() {
new JavaTestKit(getSystem()) {{
-
new Within(duration("1 seconds")) {
@Override
protected void run() {
leader.getFollowerToSnapshot().getNextChunk();
leader.getFollowerToSnapshot().incrementChunkIndex();
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
leader.handleMessage(leaderActor, new SendHeartBeat());
- AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
- followerActor, AppendEntries.SERIALIZABLE_CLASS);
+ AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
+ followerActor, AppendEntries.class);
assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
"received", aeproto);
leader.handleMessage(senderActor, new SendHeartBeat());
- InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
- MessageCollectorActor.getFirstMatching(followerActor,
- InstallSnapshot.SERIALIZABLE_CLASS);
+ InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
+ InstallSnapshot.SERIALIZABLE_CLASS);
assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
isproto);
//update follower timestamp
leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
senderActor, new Replicate(null, "state-id", entry));
RaftActorBehavior raftBehavior = leader.handleMessage(
leaderActor, new InitiateInstallSnapshot());
- CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+ CaptureSnapshot cs = MessageCollectorActor.
getFirstMatching(leaderActor, CaptureSnapshot.class);
assertNotNull(cs);
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
+
+ // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+ raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
+ List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
+
}};
}
Leader leader = new Leader(actorContext);
+ // Ignore initial heartbeat.
+ expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
// new entry
ReplicatedLogImplEntry entry =
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
MockLeader leader = new MockLeader(actorContext);
+ // Ignore initial heartbeat.
+ expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
assertEquals(snapshotIndex + 1, fli.getNextIndex());
}};
}
+ @Test
+ public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ };
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ actorContext.setConfigParams(configParams);
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(1, objectList.size());
+
+ Object o = objectList.get(0);
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(2, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(3, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+ // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ // Count should still stay at 3
+ assertEquals(3, objectList.size());
+ }};
+ }
+
@Test
- public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
new JavaTestKit(getSystem()) {{
TestActorRef<MessageCollectorActor> followerActor =
leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+ MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
- InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
+ followerActor.underlyingActor().clear();
- leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ followerActor.path().toString(), -1, false));
- o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
new JavaTestKit(getSystem()) {
{
-
TestActorRef<MessageCollectorActor> followerActor =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(followerActor.path().toString(),
leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
-
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
- InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
int hashCode = installSnapshot.getData().hashCode();
- leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ followerActor.underlyingActor().clear();
- o = MessageCollectorActor.getAllMessages(followerActor).get(1);
-
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
- installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(2, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
@Override
protected RaftActorContext createActorContext(ActorRef actorRef) {
- return new MockRaftActorContext("test", getSystem(), actorRef);
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+ MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
+ context.setConfigParams(configParams);
+ return context;
}
private ByteString toByteString(Map<String, String> state) {
}
public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
- private static AbstractRaftActorBehavior behavior;
-
- public ForwardMessageToBehaviorActor(){
-
- }
+ AbstractRaftActorBehavior behavior;
@Override public void onReceive(Object message) throws Exception {
+ if(behavior != null) {
+ behavior.handleMessage(sender(), message);
+ }
+
super.onReceive(message);
- behavior.handleMessage(sender(), message);
}
- public static void setBehavior(AbstractRaftActorBehavior behavior){
- ForwardMessageToBehaviorActor.behavior = behavior;
+ public static Props props() {
+ return Props.create(ForwardMessageToBehaviorActor.class);
}
}
@Test
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
new JavaTestKit(getSystem()) {{
-
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(ForwardMessageToBehaviorActor.class));
MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
- ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ new MockRaftActorContext("follower", getSystem(), followerActor);
Follower follower = new Follower(followerActorContext);
-
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ followerActor.underlyingActor().behavior = follower;
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ peerAddresses.put("follower", followerActor.path().toString());
leaderActorContext.setPeerAddresses(peerAddresses);
//create 3 entries
leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
leaderActorContext.setCommitIndex(1);
// follower too has the exact same log entries and has the same commit index
followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
followerActorContext.setCommitIndex(1);
Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive(followerActor.path().toString());
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
- AppendEntriesMessages.AppendEntries appendEntries =
- (AppendEntriesMessages.AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+ assertEquals(0, appendEntries.getEntries().size());
assertEquals(0, appendEntries.getPrevLogIndex());
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
leaderActor, AppendEntriesReply.class);
-
assertNotNull(appendEntriesReply);
- // follower returns its next index
assertEquals(2, appendEntriesReply.getLogLastIndex());
assertEquals(1, appendEntriesReply.getLogLastTerm());
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
}};
}
@Test
public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
new JavaTestKit(getSystem()) {{
-
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(ForwardMessageToBehaviorActor.class));
MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
- ActorRef followerActor = getSystem().actorOf(
- Props.create(ForwardMessageToBehaviorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ new MockRaftActorContext("follower", getSystem(), followerActor);
Follower follower = new Follower(followerActorContext);
-
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ followerActor.underlyingActor().behavior = follower;
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ peerAddresses.put("follower", followerActor.path().toString());
leaderActorContext.setPeerAddresses(peerAddresses);
leaderActorContext.getReplicatedLog().removeFrom(0);
leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
leaderActorContext.setCommitIndex(1);
followerActorContext.getReplicatedLog().removeFrom(0);
followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
// follower has the same log entries but its commit index > leaders commit index
followerActorContext.setCommitIndex(2);
Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive(followerActor.path().toString());
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
- AppendEntriesMessages.AppendEntries appendEntries =
- (AppendEntriesMessages.AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+ // Initial heartbeat
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+ assertEquals(0, appendEntries.getEntries().size());
assertEquals(0, appendEntries.getPrevLogIndex());
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
leaderActor, AppendEntriesReply.class);
+ assertNotNull(appendEntriesReply);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ leaderActor.underlyingActor().behavior = leader;
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(2, appendEntries.getPrevLogIndex());
+
+ appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
assertNotNull(appendEntriesReply);
assertEquals(2, appendEntriesReply.getLogLastIndex());
assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(1, followerActorContext.getCommitIndex());
}};
}
assertEquals(2, leaderActorContext.getCommitIndex());
ApplyLogEntries applyLogEntries =
- (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
- ApplyLogEntries.class);
+ MessageCollectorActor.getFirstMatching(leaderActor,
+ ApplyLogEntries.class);
assertNotNull(applyLogEntries);
}};
}
+
+ @Test
+ public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+ followerActorContext.setConfigParams(configParams);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().behavior = follower;
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+ assertNotNull(appendEntriesReply);
+ System.out.println("appendEntriesReply: "+appendEntriesReply);
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ // Clear initial heartbeat messages
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ // create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+ assertNotNull(appendEntries);
+
+ // Should send first log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(-1, appendEntries.getPrevLogIndex());
+
+ appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertNotNull(appendEntriesReply);
+
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(0, appendEntriesReply.getLogLastIndex());
+
+ followerActor.underlyingActor().clear();
+
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+ appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+ assertNotNull(appendEntries);
+
+ // Should send second log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ }};
+ }
+
class MockLeader extends Leader {
FollowerToSnapshot fts;
private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
- private long electionTimeOutIntervalMillis;
- private int snapshotChunkSize;
+ private final long electionTimeOutIntervalMillis;
+ private final int snapshotChunkSize;
public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
super();