import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
public class LeaderTest extends AbstractRaftActorBehaviorTest {
- private final ActorRef leaderActor =
- getSystem().actorOf(Props.create(DoNothingActor.class));
- private final ActorRef senderActor =
- getSystem().actorOf(Props.create(DoNothingActor.class));
+ static final String FOLLOWER_ID = "follower";
- @Test
- public void testHandleMessageForUnknownMessage() throws Exception {
- new JavaTestKit(getSystem()) {{
- Leader leader =
- new Leader(createActorContext());
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
- // handle message should return the Leader state when it receives an
- // unknown message
- RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
- Assert.assertTrue(behavior instanceof Leader);
- }};
+ private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
+ Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
+
+ private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
+ Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
+
+ private Leader leader;
+
+ @After
+ public void tearDown() throws Exception {
+ if(leader != null) {
+ leader.close();
+ }
+
+ actorFactory.close();
+ }
+
+ private void logStart(String name) {
+ LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
}
@Test
- public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
- new JavaTestKit(getSystem()) {{
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
- ActorRef followerActor = getTestActor();
+ public void testHandleMessageForUnknownMessage() throws Exception {
+ logStart("testHandleMessageForUnknownMessage");
+
+ leader = new Leader(createActorContext());
- MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+ // handle message should return the Leader state when it receives an
+ // unknown message
+ RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
+ Assert.assertTrue(behavior instanceof Leader);
+ }
- Map<String, String> peerAddresses = new HashMap<>();
+ @Test
+ public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
+ logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
- String followerId = "follower";
- peerAddresses.put(followerId, followerActor.path().toString());
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- actorContext.setPeerAddresses(peerAddresses);
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
- long term = 1;
- actorContext.getTermInformation().update(term, "");
+ leader = new Leader(actorContext);
- Leader leader = new Leader(actorContext);
+ // Leader should send an immediate heartbeat with no entries as follower is inactive.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getTerm", term, appendEntries.getTerm());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
- // Leader should send an immediate heartbeat with no entries as follower is inactive.
- long lastIndex = actorContext.getReplicatedLog().lastIndex();
- AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
- assertEquals("getTerm", term, appendEntries.getTerm());
- assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
- assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
- assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ // The follower would normally reply - simulate that explicitly here.
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex - 1, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
- // The follower would normally reply - simulate that explicitly here.
- leader.handleMessage(followerActor, new AppendEntriesReply(
- followerId, term, true, lastIndex - 1, term));
- assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+ followerActor.underlyingActor().clear();
- // Sleep for the heartbeat interval so AppendEntries is sent.
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
- getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+ getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
- leader.handleMessage(senderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
- assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
- assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
- assertEquals("Entries size", 1, appendEntries.getEntries().size());
- assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
- assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
- }
- };
- }};
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
}
@Test
- public void testHandleReplicateMessageSendAppendEntriesToFollower() {
- new JavaTestKit(getSystem()) {{
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
- ActorRef followerActor = getTestActor();
-
- MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
-
- Map<String, String> peerAddresses = new HashMap<>();
+ public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
+ logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
- String followerId = "follower";
- peerAddresses.put(followerId, followerActor.path().toString());
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- actorContext.setPeerAddresses(peerAddresses);
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
- long term = 1;
- actorContext.getTermInformation().update(term, "");
+ leader = new Leader(actorContext);
- Leader leader = new Leader(actorContext);
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- // Leader will send an immediate heartbeat - ignore it.
- expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
- // The follower would normally reply - simulate that explicitly here.
- long lastIndex = actorContext.getReplicatedLog().lastIndex();
- leader.handleMessage(followerActor, new AppendEntriesReply(
- followerId, term, true, lastIndex, term));
- assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+ followerActor.underlyingActor().clear();
- MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
- MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
- 1, lastIndex + 1, payload);
- actorContext.getReplicatedLog().append(newEntry);
- RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
- new Replicate(null, null, newEntry));
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 1, lastIndex + 1, payload);
+ actorContext.getReplicatedLog().append(newEntry);
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new Replicate(null, null, newEntry));
- // State should not change
- assertTrue(raftBehavior instanceof Leader);
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
- AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
- assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
- assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
- assertEquals("Entries size", 1, appendEntries.getEntries().size());
- assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
- assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
- assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
- }
- };
- }};
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
}
@Test
- public void testHandleReplicateMessageWhenThereAreNoFollowers() {
- new JavaTestKit(getSystem()) {{
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
+ public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
+ logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
- ActorRef raftActor = getTestActor();
+ MockRaftActorContext actorContext = createActorContext();
- MockRaftActorContext actorContext =
- new MockRaftActorContext("test", getSystem(), raftActor);
+ leader = new Leader(actorContext);
- actorContext.getReplicatedLog().removeFrom(0);
+ actorContext.setLastApplied(0);
- actorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
- .build());
+ long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
+ long term = actorContext.getTermInformation().getCurrentTerm();
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
- Leader leader = new Leader(actorContext);
- RaftActorBehavior raftBehavior = leader
- .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
+ actorContext.getReplicatedLog().append(newEntry);
- // State should not change
- assertTrue(raftBehavior instanceof Leader);
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new Replicate(leaderActor, "state-id", newEntry));
- assertEquals(1, actorContext.getCommitIndex());
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
- final String out =
- new ExpectMsg<String>(duration("1 seconds"),
- "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in instanceof ApplyState) {
- if (((ApplyState) in).getIdentifier().equals("state-id")) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
- assertEquals("match", out);
+ // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
+ // one since lastApplied state is 0.
+ List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
+ leaderActor, ApplyState.class);
+ assertEquals("ApplyState count", newLogIndex, applyStateList.size());
- }
- };
- }};
+ for(int i = 0; i <= newLogIndex - 1; i++ ) {
+ ApplyState applyState = applyStateList.get(i);
+ assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
+ }
+
+ ApplyState last = applyStateList.get((int) newLogIndex - 1);
+ assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
+ assertEquals("getIdentifier", "state-id", last.getIdentifier());
}
@Test
public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
- new JavaTestKit(getSystem()) {{
- ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
-
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext(leaderActor);
- actorContext.setPeerAddresses(peerAddresses);
-
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
-
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
-
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
-
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.setCommitIndex(followersLastIndex);
- //set follower timeout to 2 mins, helps during debugging
- actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
-
- MockLeader leader = new MockLeader(actorContext);
-
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
+ logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
- //update follower timestamp
- leader.markFollowerActive(followerActor.path().toString());
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
- leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- //send first chunk and no InstallSnapshotReply received yet
- leader.getFollowerToSnapshot().getNextChunk();
- leader.getFollowerToSnapshot().incrementChunkIndex();
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(followersLastIndex);
+ //set follower timeout to 2 mins, helps during debugging
+ actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
- AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
- followerActor, AppendEntries.class);
+ leader = new Leader(actorContext);
- assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
- "received", aeproto);
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
- AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
- assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+ FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ leader.setFollowerSnapshot(FOLLOWER_ID, fts);
- //InstallSnapshotReply received
- leader.getFollowerToSnapshot().markSendStatus(true);
+ //send first chunk and no InstallSnapshotReply received yet
+ fts.getNextChunk();
+ fts.incrementChunkIndex();
- leader.handleMessage(senderActor, new SendHeartBeat());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
- InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
- InstallSnapshot.SERIALIZABLE_CLASS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
- isproto);
+ AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+ AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
- assertEquals(snapshotIndex, is.getLastIncludedIndex());
+ assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
- }};
- }
+ //InstallSnapshotReply received
+ fts.markSendStatus(true);
- @Test
- public void testSendAppendEntriesSnapshotScenario() {
- new JavaTestKit(getSystem()) {{
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- ActorRef followerActor = getTestActor();
+ InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
+ InstallSnapshot.SERIALIZABLE_CLASS);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext(getRef());
- actorContext.setPeerAddresses(peerAddresses);
+ assertEquals(snapshotIndex, is.getLastIncludedIndex());
+ }
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ @Test
+ public void testSendAppendEntriesSnapshotScenario() throws Exception {
+ logStart("testSendAppendEntriesSnapshotScenario");
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.setCommitIndex(followersLastIndex);
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- Leader leader = new Leader(actorContext);
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(followersLastIndex);
- //update follower timestamp
- leader.markFollowerActive(followerActor.path().toString());
+ leader = new Leader(actorContext);
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
- RaftActorBehavior raftBehavior = leader.handleMessage(
- senderActor, new Replicate(null, "state-id", entry));
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
- assertTrue(raftBehavior instanceof Leader);
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
- // we might receive some heartbeat messages, so wait till we get CaptureSnapshot
- Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
- @Override
- protected Boolean match(Object o) throws Exception {
- if (o instanceof CaptureSnapshot) {
- return true;
- }
- return false;
- }
- }.get();
+ // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ RaftActorBehavior raftBehavior = leader.handleMessage(
+ leaderActor, new Replicate(null, "state-id", entry));
- boolean captureSnapshot = false;
- for (Boolean b: matches) {
- captureSnapshot = b | captureSnapshot;
- }
+ assertTrue(raftBehavior instanceof Leader);
- assertTrue(captureSnapshot);
- }};
+ MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
}
@Test
public void testInitiateInstallSnapshot() throws Exception {
- new JavaTestKit(getSystem()) {{
+ logStart("testInitiateInstallSnapshot");
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- ActorRef followerActor = getTestActor();
-
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
-
- MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(leaderActor);
- actorContext.setPeerAddresses(peerAddresses);
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setLastApplied(3);
+ actorContext.setCommitIndex(followersLastIndex);
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.setLastApplied(3);
- actorContext.setCommitIndex(followersLastIndex);
+ leader = new Leader(actorContext);
- Leader leader = new Leader(actorContext);
- // set the snapshot as absent and check if capture-snapshot is invoked.
- leader.setSnapshot(Optional.<ByteString>absent());
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- // new entry
- ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
+ // set the snapshot as absent and check if capture-snapshot is invoked.
+ leader.setSnapshot(Optional.<ByteString>absent());
- actorContext.getReplicatedLog().append(entry);
+ // new entry
+ ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
- //update follower timestamp
- leader.markFollowerActive(followerActor.path().toString());
+ actorContext.getReplicatedLog().append(entry);
- RaftActorBehavior raftBehavior = leader.handleMessage(
- senderActor, new Replicate(null, "state-id", entry));
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
- CaptureSnapshot cs = MessageCollectorActor.
- getFirstMatching(leaderActor, CaptureSnapshot.class);
+ leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- assertNotNull(cs);
+ CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
- assertTrue(cs.isInstallSnapshotInitiated());
- assertEquals(3, cs.getLastAppliedIndex());
- assertEquals(1, cs.getLastAppliedTerm());
- assertEquals(4, cs.getLastIndex());
- assertEquals(2, cs.getLastTerm());
+ assertTrue(cs.isInstallSnapshotInitiated());
+ assertEquals(3, cs.getLastAppliedIndex());
+ assertEquals(1, cs.getLastAppliedTerm());
+ assertEquals(4, cs.getLastIndex());
+ assertEquals(2, cs.getLastTerm());
- // if an initiate is started again when first is in progress, it shouldnt initiate Capture
- leader.handleMessage(senderActor, new Replicate(null, "state-id", entry));
- List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
- assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
+ // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+ leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- }};
+ List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
}
@Test
- public void testInstallSnapshot() {
- new JavaTestKit(getSystem()) {{
+ public void testInstallSnapshot() throws Exception {
+ logStart("testInstallSnapshot");
- ActorRef followerActor = getTestActor();
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
- actorContext.setPeerAddresses(peerAddresses);
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ actorContext.setCommitIndex(followersLastIndex);
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ leader = new Leader(actorContext);
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ // Ignore initial heartbeat.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- actorContext.setCommitIndex(followersLastIndex);
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new SendInstallSnapshot(toByteString(leadersSnapshot)));
- Leader leader = new Leader(actorContext);
+ assertTrue(raftBehavior instanceof Leader);
- // Ignore initial heartbeat.
- expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ // check if installsnapshot gets called with the correct values.
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
+ InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
+ MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
- RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
- new SendInstallSnapshot(toByteString(leadersSnapshot)));
+ assertNotNull(installSnapshot.getData());
+ assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
+ assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
- assertTrue(raftBehavior instanceof Leader);
-
- // check if installsnapshot gets called with the correct values.
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
- InstallSnapshot is = (InstallSnapshot)
- SerializationUtils.fromSerializable(in);
- if (is.getData() == null) {
- return "InstallSnapshot data is null";
- }
- if (is.getLastIncludedIndex() != snapshotIndex) {
- return is.getLastIncludedIndex() + "!=" + snapshotIndex;
- }
- if (is.getLastIncludedTerm() != snapshotTerm) {
- return is.getLastIncludedTerm() + "!=" + snapshotTerm;
- }
- if (is.getTerm() == currentTerm) {
- return is.getTerm() + "!=" + currentTerm;
- }
-
- return "match";
-
- } else {
- return "message mismatch:" + in.getClass();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
- }};
+ // FIXME - we don't set the term in the serialized message.
+ //assertEquals(currentTerm, installSnapshot.getTerm());
}
@Test
- public void testHandleInstallSnapshotReplyLastChunk() {
- new JavaTestKit(getSystem()) {{
+ public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
+ logStart("testHandleInstallSnapshotReplyLastChunk");
- ActorRef followerActor = getTestActor();
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
-
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
-
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
- actorContext.setPeerAddresses(peerAddresses);
- actorContext.setCommitIndex(followersLastIndex);
-
- MockLeader leader = new MockLeader(actorContext);
-
- // Ignore initial heartbeat.
- expectMsgClass(duration("5 seconds"), AppendEntries.class);
-
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
-
- // set the snapshot variables in replicatedlog
-
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-
- ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
- leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
- while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
- leader.getFollowerToSnapshot().getNextChunk();
- leader.getFollowerToSnapshot().incrementChunkIndex();
- }
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ actorContext.setCommitIndex(followersLastIndex);
- RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
- new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
- leader.getFollowerToSnapshot().getChunkIndex(), true));
+ leader = new Leader(actorContext);
- assertTrue(raftBehavior instanceof Leader);
+ // Ignore initial heartbeat.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertEquals(0, leader.followerSnapshotSize());
- assertEquals(1, leader.followerLogSize());
- assertNotNull(leader.getFollower(followerActor.path().toString()));
- FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
- assertEquals(snapshotIndex, fli.getMatchIndex());
- assertEquals(snapshotIndex, fli.getMatchIndex());
- assertEquals(snapshotIndex + 1, fli.getNextIndex());
- }};
- }
- @Test
- public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
- new JavaTestKit(getSystem()) {{
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- TestActorRef<MessageCollectorActor> followerActor =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+ // set the snapshot variables in replicatedlog
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower-reply",
- followerActor.path().toString());
-
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
-
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
- DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
- @Override
- public int getSnapshotChunkSize() {
- return 50;
- }
- };
- configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
- configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+ FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+ while(!fts.isLastChunk(fts.getChunkIndex())) {
+ fts.getNextChunk();
+ fts.incrementChunkIndex();
+ }
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- actorContext.setConfigParams(configParams);
- actorContext.setPeerAddresses(peerAddresses);
- actorContext.setCommitIndex(followersLastIndex);
+ RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
+ new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
- MockLeader leader = new MockLeader(actorContext);
+ assertTrue(raftBehavior instanceof Leader);
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ assertEquals(0, leader.followerSnapshotSize());
+ assertEquals(1, leader.followerLogSize());
+ FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
+ assertNotNull(fli);
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex + 1, fli.getNextIndex());
+ }
+
+ @Test
+ public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+ logStart("testSendSnapshotfromInstallSnapshotReply");
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ };
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
- List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
- InstallSnapshotMessages.InstallSnapshot.class);
+ actorContext.setConfigParams(configParams);
+ actorContext.setCommitIndex(followersLastIndex);
- assertEquals(1, objectList.size());
+ leader = new Leader(actorContext);
- Object o = objectList.get(0);
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- assertEquals(1, installSnapshot.getChunkIndex());
- assertEquals(3, installSnapshot.getTotalChunks());
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
- leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
- "follower-reply", installSnapshot.getChunkIndex(), true));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- objectList = MessageCollectorActor.getAllMatching(followerActor,
- InstallSnapshotMessages.InstallSnapshot.class);
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- assertEquals(2, objectList.size());
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
- installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+ followerActor.underlyingActor().clear();
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
- leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
- "follower-reply", installSnapshot.getChunkIndex(), true));
+ installSnapshot = MessageCollectorActor.expectFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- objectList = MessageCollectorActor.getAllMatching(followerActor,
- InstallSnapshotMessages.InstallSnapshot.class);
+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(3, objectList.size());
+ followerActor.underlyingActor().clear();
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
- installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+ installSnapshot = MessageCollectorActor.expectFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
- leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
- "follower-reply", installSnapshot.getChunkIndex(), true));
+ // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+ followerActor.underlyingActor().clear();
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
- objectList = MessageCollectorActor.getAllMatching(followerActor,
- InstallSnapshotMessages.InstallSnapshot.class);
+ installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- // Count should still stay at 3
- assertEquals(3, objectList.size());
- }};
+ Assert.assertNull(installSnapshot);
}
@Test
public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
- new JavaTestKit(getSystem()) {{
-
- TestActorRef<MessageCollectorActor> followerActor =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
-
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- actorContext.setConfigParams(new DefaultConfigParamsImpl(){
- @Override
- public int getSnapshotChunkSize() {
- return 50;
- }
- });
- actorContext.setPeerAddresses(peerAddresses);
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
- MockLeader leader = new MockLeader(actorContext);
+ actorContext.setCommitIndex(followersLastIndex);
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ leader = new Leader(actorContext);
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
- MessageCollectorActor.getAllMatching(followerActor,
- InstallSnapshotMessages.InstallSnapshot.class);
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- assertNotNull(installSnapshot);
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- assertEquals(1, installSnapshot.getChunkIndex());
- assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
- followerActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
- leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
- followerActor.path().toString(), -1, false));
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ FOLLOWER_ID, -1, false));
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
- installSnapshot = MessageCollectorActor.getFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- assertNotNull(installSnapshot);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- assertEquals(1, installSnapshot.getChunkIndex());
- assertEquals(3, installSnapshot.getTotalChunks());
+ installSnapshot = MessageCollectorActor.expectFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- followerActor.tell(PoisonPill.getInstance(), getRef());
- }};
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
}
@Test
public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- TestActorRef<MessageCollectorActor> followerActor =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
-
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- actorContext.setConfigParams(new DefaultConfigParamsImpl() {
- @Override
- public int getSnapshotChunkSize() {
- return 50;
- }
- });
- actorContext.setPeerAddresses(peerAddresses);
- actorContext.setCommitIndex(followersLastIndex);
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
- MockLeader leader = new MockLeader(actorContext);
+ actorContext.setCommitIndex(followersLastIndex);
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ leader = new Leader(actorContext);
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
- InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- assertNotNull(installSnapshot);
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- assertEquals(1, installSnapshot.getChunkIndex());
- assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- int hashCode = installSnapshot.getData().hashCode();
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
- followerActor.underlyingActor().clear();
+ int hashCode = installSnapshot.getData().hashCode();
- leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+ followerActor.underlyingActor().clear();
- installSnapshot = MessageCollectorActor.getFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- assertNotNull(installSnapshot);
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
+ FOLLOWER_ID, 1, true));
- assertEquals(2, installSnapshot.getChunkIndex());
- assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+ installSnapshot = MessageCollectorActor.expectFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
- followerActor.tell(PoisonPill.getInstance(), getRef());
- }};
+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
}
@Test
public void testFollowerToSnapshotLogic() {
+ logStart("testFollowerToSnapshotLogic");
- MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+ MockRaftActorContext actorContext = createActorContext();
actorContext.setConfigParams(new DefaultConfigParamsImpl() {
@Override
}
});
- MockLeader leader = new MockLeader(actorContext);
+ leader = new Leader(actorContext);
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
ByteString bs = toByteString(leadersSnapshot);
byte[] barray = bs.toByteArray();
- leader.createFollowerToSnapshot("followerId", bs);
+ FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+
assertEquals(bs.size(), barray.length);
int chunkIndex=0;
j = barray.length;
}
- ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+ ByteString chunk = fts.getNextChunk();
assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
- assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+ assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
- leader.getFollowerToSnapshot().markSendStatus(true);
- if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
- leader.getFollowerToSnapshot().incrementChunkIndex();
+ fts.markSendStatus(true);
+ if (!fts.isLastChunk(chunkIndex)) {
+ fts.incrementChunkIndex();
}
}
- assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+ assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
}
-
@Override protected RaftActorBehavior createBehavior(
RaftActorContext actorContext) {
return new Leader(actorContext);
}
- @Override protected RaftActorContext createActorContext() {
+ @Override
+ protected MockRaftActorContext createActorContext() {
return createActorContext(leaderActor);
}
@Override
- protected RaftActorContext createActorContext(ActorRef actorRef) {
+ protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+ return createActorContext("leader", actorRef);
+ }
+
+ private MockRaftActorContext createActorContextWithFollower() {
+ MockRaftActorContext actorContext = createActorContext();
+ actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
+ followerActor.path().toString()).build());
+ return actorContext;
+ }
+
+ private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
- MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
+ MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
context.setConfigParams(configParams);
return context;
}
@Test
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
- new JavaTestKit(getSystem()) {{
- TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
- Props.create(ForwardMessageToBehaviorActor.class));
+ logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
- TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
- ForwardMessageToBehaviorActor.props());
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
- MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().behavior = follower;
- Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().behavior = follower;
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower", followerActor.path().toString());
+ leaderActorContext.setPeerAddresses(peerAddresses);
- leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.getReplicatedLog().removeFrom(0);
- leaderActorContext.getReplicatedLog().removeFrom(0);
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
- //create 3 entries
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ leaderActorContext.setCommitIndex(1);
- leaderActorContext.setCommitIndex(1);
+ followerActorContext.getReplicatedLog().removeFrom(0);
- followerActorContext.getReplicatedLog().removeFrom(0);
+ // follower too has the exact same log entries and has the same commit index
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
- // follower too has the exact same log entries and has the same commit index
- followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ followerActorContext.setCommitIndex(1);
- followerActorContext.setCommitIndex(1);
+ leader = new Leader(leaderActorContext);
- Leader leader = new Leader(leaderActorContext);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
- assertNotNull(appendEntries);
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(0, appendEntries.getPrevLogIndex());
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(0, appendEntries.getEntries().size());
- assertEquals(0, appendEntries.getPrevLogIndex());
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
- leaderActor, AppendEntriesReply.class);
- assertNotNull(appendEntriesReply);
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
- assertEquals(2, appendEntriesReply.getLogLastIndex());
- assertEquals(1, appendEntriesReply.getLogLastTerm());
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
- // follower returns its next index
- assertEquals(2, appendEntriesReply.getLogLastIndex());
- assertEquals(1, appendEntriesReply.getLogLastTerm());
- }};
+ follower.close();
}
-
@Test
public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
- new JavaTestKit(getSystem()) {{
- TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
- Props.create(ForwardMessageToBehaviorActor.class));
+ logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ MockRaftActorContext leaderActorContext = createActorContext();
- TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
- ForwardMessageToBehaviorActor.props());
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
- MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().behavior = follower;
- Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().behavior = follower;
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower", followerActor.path().toString());
+ leaderActorContext.setPeerAddresses(peerAddresses);
- leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.getReplicatedLog().removeFrom(0);
- leaderActorContext.getReplicatedLog().removeFrom(0);
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ leaderActorContext.setCommitIndex(1);
- leaderActorContext.setCommitIndex(1);
+ followerActorContext.getReplicatedLog().removeFrom(0);
- followerActorContext.getReplicatedLog().removeFrom(0);
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
- followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ // follower has the same log entries but its commit index > leaders commit index
+ followerActorContext.setCommitIndex(2);
- // follower has the same log entries but its commit index > leaders commit index
- followerActorContext.setCommitIndex(2);
+ leader = new Leader(leaderActorContext);
- Leader leader = new Leader(leaderActorContext);
+ // Initial heartbeat
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- // Initial heartbeat
- AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
- assertNotNull(appendEntries);
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(0, appendEntries.getPrevLogIndex());
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(0, appendEntries.getEntries().size());
- assertEquals(0, appendEntries.getPrevLogIndex());
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
- leaderActor, AppendEntriesReply.class);
- assertNotNull(appendEntriesReply);
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
- assertEquals(2, appendEntriesReply.getLogLastIndex());
- assertEquals(1, appendEntriesReply.getLogLastTerm());
+ leaderActor.underlyingActor().behavior = leader;
+ leader.handleMessage(followerActor, appendEntriesReply);
- leaderActor.underlyingActor().behavior = leader;
- leader.handleMessage(followerActor, appendEntriesReply);
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
- leaderActor.underlyingActor().clear();
- followerActor.underlyingActor().clear();
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
- Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
- assertNotNull(appendEntries);
+ assertEquals(2, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(2, appendEntries.getPrevLogIndex());
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(0, appendEntries.getEntries().size());
- assertEquals(2, appendEntries.getPrevLogIndex());
+ appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
- appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
- assertNotNull(appendEntriesReply);
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
- assertEquals(2, appendEntriesReply.getLogLastIndex());
- assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(2, followerActorContext.getCommitIndex());
- assertEquals(1, followerActorContext.getCommitIndex());
- }};
+ follower.close();
}
@Test
public void testHandleAppendEntriesReplyFailure(){
- new JavaTestKit(getSystem()) {
- {
+ logStart("testHandleAppendEntriesReplyFailure");
- ActorRef leaderActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
- ActorRef followerActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ leader = new Leader(leaderActorContext);
+ // Send initial heartbeat reply with last index.
+ leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower-1",
- followerActor.path().toString());
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
- leaderActorContext.setPeerAddresses(peerAddresses);
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
- Leader leader = new Leader(leaderActorContext);
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
- AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
-
- RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
-
- assertEquals(RaftState.Leader, raftActorBehavior.state());
-
- }};
+ assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
}
@Test
public void testHandleAppendEntriesReplySuccess() throws Exception {
- new JavaTestKit(getSystem()) {
- {
-
- ActorRef leaderActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
- ActorRef followerActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
+ logStart("testHandleAppendEntriesReplySuccess");
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower-1",
- followerActor.path().toString());
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+ leaderActorContext.getTermInformation().update(1, "leader");
- leaderActorContext.setPeerAddresses(peerAddresses);
- leaderActorContext.setCommitIndex(1);
- leaderActorContext.setLastApplied(1);
- leaderActorContext.getTermInformation().update(1, "leader");
+ leader = new Leader(leaderActorContext);
- Leader leader = new Leader(leaderActorContext);
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
- AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
- RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ assertEquals(2, leaderActorContext.getCommitIndex());
- assertEquals(2, leaderActorContext.getCommitIndex());
+ ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
+ leaderActor, ApplyLogEntries.class);
- ApplyLogEntries applyLogEntries =
- MessageCollectorActor.getFirstMatching(leaderActor,
- ApplyLogEntries.class);
+ assertEquals(2, leaderActorContext.getLastApplied());
- assertNotNull(applyLogEntries);
+ assertEquals(2, applyLogEntries.getToIndex());
- assertEquals(2, leaderActorContext.getLastApplied());
+ List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+ ApplyState.class);
- assertEquals(2, applyLogEntries.getToIndex());
+ assertEquals(1,applyStateList.size());
- List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
- ApplyState.class);
+ ApplyState applyState = applyStateList.get(0);
- assertEquals(1,applyStateList.size());
-
- ApplyState applyState = (ApplyState) applyStateList.get(0);
-
- assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
-
- }};
+ assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
}
@Test
public void testHandleAppendEntriesReplyUnknownFollower(){
- new JavaTestKit(getSystem()) {
- {
+ logStart("testHandleAppendEntriesReplyUnknownFollower");
- ActorRef leaderActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ MockRaftActorContext leaderActorContext = createActorContext();
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ leader = new Leader(leaderActorContext);
- Leader leader = new Leader(leaderActorContext);
+ AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
- AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
- RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
-
- assertEquals(RaftState.Leader, raftActorBehavior.state());
-
- }};
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
}
@Test
public void testHandleRequestVoteReply(){
- new JavaTestKit(getSystem()) {
- {
-
- ActorRef leaderActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ logStart("testHandleRequestVoteReply");
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ MockRaftActorContext leaderActorContext = createActorContext();
- Leader leader = new Leader(leaderActorContext);
+ leader = new Leader(leaderActorContext);
- RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+ // Should be a no-op.
+ RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
+ new RequestVoteReply(1, true));
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
- raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+ raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
- assertEquals(RaftState.Leader, raftActorBehavior.state());
- }};
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
}
@Test
public void testIsolatedLeaderCheckNoFollowers() {
- new JavaTestKit(getSystem()) {{
- ActorRef leaderActor = getTestActor();
-
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ logStart("testIsolatedLeaderCheckNoFollowers");
- Map<String, String> peerAddresses = new HashMap<>();
- leaderActorContext.setPeerAddresses(peerAddresses);
+ MockRaftActorContext leaderActorContext = createActorContext();
- Leader leader = new Leader(leaderActorContext);
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue(behavior instanceof Leader);
- }};
+ leader = new Leader(leaderActorContext);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue(behavior instanceof Leader);
}
@Test
public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ logStart("testIsolatedLeaderCheckTwoFollowers");
+
new JavaTestKit(getSystem()) {{
ActorRef followerActor1 = getTestActor();
ActorRef followerActor2 = getTestActor();
- MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+ MockRaftActorContext leaderActorContext = createActorContext();
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put("follower-1", followerActor1.path().toString());
leaderActorContext.setPeerAddresses(peerAddresses);
- Leader leader = new Leader(leaderActorContext);
+ leader = new Leader(leaderActorContext);
leader.stopIsolatedLeaderCheckSchedule();
leader.markFollowerActive("follower-1");
behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
behavior instanceof IsolatedLeader);
-
}};
}
@Test
public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
- new JavaTestKit(getSystem()) {{
- TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
- Props.create(MessageCollectorActor.class));
-
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
-
- DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
- //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
- configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
-
- leaderActorContext.setConfigParams(configParams);
+ logStart("testAppendEntryCallAtEndofAppendEntryReply");
- TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
- ForwardMessageToBehaviorActor.props());
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
- MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower-reply", getSystem(), followerActor);
-
- followerActorContext.setConfigParams(configParams);
-
- Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().behavior = follower;
-
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower-reply",
- followerActor.path().toString());
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
- leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setConfigParams(configParams);
- leaderActorContext.getReplicatedLog().removeFrom(0);
- leaderActorContext.setCommitIndex(-1);
- leaderActorContext.setLastApplied(-1);
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
- followerActorContext.getReplicatedLog().removeFrom(0);
- followerActorContext.setCommitIndex(-1);
- followerActorContext.setLastApplied(-1);
+ followerActorContext.setConfigParams(configParams);
- Leader leader = new Leader(leaderActorContext);
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().behavior = follower;
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
- leaderActor, AppendEntriesReply.class);
- assertNotNull(appendEntriesReply);
- System.out.println("appendEntriesReply: "+appendEntriesReply);
- leader.handleMessage(followerActor, appendEntriesReply);
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
- // Clear initial heartbeat messages
+ followerActorContext.getReplicatedLog().removeFrom(0);
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
- leaderActor.underlyingActor().clear();
- followerActor.underlyingActor().clear();
+ leader = new Leader(leaderActorContext);
- // create 3 entries
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
- leaderActorContext.setCommitIndex(1);
- leaderActorContext.setLastApplied(1);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
- Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
+ leader.handleMessage(followerActor, appendEntriesReply);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ // Clear initial heartbeat messages
- AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
- assertNotNull(appendEntries);
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
- // Should send first log entry
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(0, appendEntries.getEntries().get(0).getIndex());
- assertEquals(-1, appendEntries.getPrevLogIndex());
+ // create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
- appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
- assertNotNull(appendEntriesReply);
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
- assertEquals(1, appendEntriesReply.getLogLastTerm());
- assertEquals(0, appendEntriesReply.getLogLastIndex());
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- followerActor.underlyingActor().clear();
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+ // Should send first log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(-1, appendEntries.getPrevLogIndex());
- appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
- assertNotNull(appendEntries);
+ appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
- // Should send second log entry
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getEntries().get(0).getIndex());
- }};
- }
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(0, appendEntriesReply.getLogLastIndex());
- class MockLeader extends Leader {
+ followerActor.underlyingActor().clear();
- FollowerToSnapshot fts;
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
- public MockLeader(RaftActorContext context){
- super(context);
- }
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- public FollowerToSnapshot getFollowerToSnapshot() {
- return fts;
- }
+ // Should send second log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
- public void createFollowerToSnapshot(String followerId, ByteString bs ) {
- fts = new FollowerToSnapshot(bs);
- setFollowerSnapshot(followerId, fts);
- }
+ follower.close();
}
private class MockConfigParamsImpl extends DefaultConfigParamsImpl {