import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
return sendReplicate(actorContext, 1, index);
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
+ final long index) {
return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
+ final Payload payload) {
SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
actorContext.getReplicatedLog().append(newEntry);
return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
sendReplicate(actorContext, lastIndex + i + 1);
leader.handleMessage(followerActor, new AppendEntriesReply(
FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
-
}
- for (int i = 3; i < 5; i++) {
- sendReplicate(actorContext, lastIndex + i + 1);
+ // We are expecting six messages here -- a request to replicate and a consensus-reached message
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
+ for (int i = 0; i < 3; i++) {
+ assertRequestEntry(lastIndex, allMessages, i);
+ assertCommitEntry(lastIndex, allMessages, i);
}
- List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
- // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
- // get sent to the follower - but not the 5th
- assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+ // Now perform another commit, eliciting a request to persist
+ sendReplicate(actorContext, lastIndex + 3 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // This elicits another message for request to replicate
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ assertRequestEntry(lastIndex, allMessages, 3);
- for (int i = 0; i < 4; i++) {
- long expected = allMessages.get(i).getEntries().get(0).getIndex();
- assertEquals(expected, i + 2);
- }
+ sendReplicate(actorContext, lastIndex + 4 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ }
+
+ private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
+ assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
+ assertEquals(ImmutableList.of(), commitReq.getEntries());
+ }
+
+ private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries req = allMessages.get(2 * messageNr);
+ assertEquals(lastIndex + messageNr, req.getLeaderCommit());
+
+ final List<ReplicatedLogEntry> entries = req.getEntries();
+ assertEquals(1, entries.size());
+ assertEquals(messageNr + 2, entries.get(0).getIndex());
}
@Test