private long lastReplicatedIndex = -1L;
+ private long sentCommitIndex = -1L;
+
private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
private short payloadVersion = -1;
* sending duplicate message too frequently if the last replicate message was sent and no reply has been received
* yet within the current heart beat interval
*
+ * @param commitIndex current commitIndex
* @return true if it is OK to replicate, false otherwise
*/
- public boolean okToReplicate() {
+ public boolean okToReplicate(final long commitIndex) {
if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
return false;
}
- // Return false if we are trying to send duplicate data before the heartbeat interval
- if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
+ // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
+ if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
+ && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
< context.getConfigParams().getHeartBeatInterval().toMillis()) {
return false;
}
return slicedLogEntryIndex != NO_INDEX;
}
- public void setNeedsLeaderAddress(boolean value) {
+ public void setNeedsLeaderAddress(final boolean value) {
needsLeaderAddress = value;
}
+ public boolean hasStaleCommitIndex(final long commitIndex) {
+ return sentCommitIndex != commitIndex;
+ }
+
+ public void setSentCommitIndex(final long commitIndex) {
+ sentCommitIndex = commitIndex;
+ }
+
@Nullable
- public String needsLeaderAddress(String leaderId) {
+ public String needsLeaderAddress(final String leaderId) {
return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
}
@Override
public String toString() {
return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
- + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
- + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
- + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+ + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex
+ + ", votingState=" + peerInfo.getVotingState()
+ + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
}
}
} else if (installSnapshotState.canSendNextChunk()) {
sendSnapshotChunk(followerActor, followerLogInformation);
}
- } else if (sendHeartbeat) {
+ } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) {
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntries = true;
}
log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
followerNextIndex, followerId);
- if (followerLogInformation.okToReplicate()) {
+ if (followerLogInformation.okToReplicate(context.getCommitIndex())) {
entries = getEntriesToSend(followerLogInformation, followerActor);
sendAppendEntries = true;
}
context.getReplicatedLog().size());
}
- } else if (sendHeartbeat) {
+ } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) {
// we send an AppendEntries, even if the follower is inactive
// in-order to update the followers timestamp, in case it becomes active again
sendAppendEntries = true;
appendEntries);
}
+ followerLogInformation.setSentCommitIndex(leaderCommitIndex);
followerActor.tell(appendEntries, actor());
}
// we cannot rely comfortably that the sleep will indeed sleep for the desired time
// hence getting the actual elapsed time and do a match.
// if the sleep has spilled over, then return the test gracefully
- private static long sleepWithElaspsedTimeReturned(long millis) {
+ private static long sleepWithElaspsedTimeReturned(final long millis) {
Stopwatch stopwatch = Stopwatch.createStarted();
Uninterruptibles.sleepUninterruptibly(millis, TimeUnit.MILLISECONDS);
stopwatch.stop();
FollowerLogInformation followerLogInformation =
new FollowerLogInformation(new PeerInfo("follower1", null, VotingState.VOTING), 10, context);
- assertTrue(followerLogInformation.okToReplicate());
- assertFalse(followerLogInformation.okToReplicate());
+ followerLogInformation.setSentCommitIndex(0);
+ assertTrue(followerLogInformation.okToReplicate(0));
+ assertFalse(followerLogInformation.okToReplicate(0));
// wait for 150 milliseconds and it should work again
Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
- assertTrue(followerLogInformation.okToReplicate());
+ assertTrue(followerLogInformation.okToReplicate(0));
//increment next index and try immediately and it should work again
followerLogInformation.incrNextIndex();
- assertTrue(followerLogInformation.okToReplicate());
+ assertTrue(followerLogInformation.okToReplicate(0));
}
@Test
context.setCommitIndex(0);
FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context);
- assertFalse(followerLogInformation.okToReplicate());
+ assertFalse(followerLogInformation.okToReplicate(0));
followerLogInformation.markFollowerActive();
assertFalse(followerLogInformation.isFollowerActive());
peerInfo.setVotingState(VotingState.VOTING);
- assertTrue(followerLogInformation.okToReplicate());
+ assertTrue(followerLogInformation.okToReplicate(0));
followerLogInformation.markFollowerActive();
assertTrue(followerLogInformation.isFollowerActive());
context.setCommitIndex(0);
FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context);
- assertTrue(followerLogInformation.okToReplicate());
+ assertTrue(followerLogInformation.okToReplicate(0));
followerLogInformation.markFollowerActive();
assertTrue(followerLogInformation.isFollowerActive());
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
// should switch to Candidate and send out RequestVote messages. Set member 1 and 3 actors
// to capture RequestVote but not to forward to the behavior just yet as we want to
// control the order of RequestVote messages to member 1 and 3.
-
- member1Actor.dropMessagesToBehavior(RequestVote.class);
-
member2Actor.expectBehaviorStateChange();
+ // member 1 and member 3 may reach consensus to consider leader's initial Noop entry as committed, hence
+ // leader would elicit this information to member 2.
+ // We do not want that, as member 2 would respond to that request either before it bumps or after it bumps its
+ // term -- if it would see that message post-bump, it would leak term 2 back to member 1, hence leader would
+ // know about it.
+ member2Actor.dropMessagesToBehavior(AppendEntries.class);
+
+ member1Actor.dropMessagesToBehavior(RequestVote.class);
member3Actor.dropMessagesToBehavior(RequestVote.class);
member2ActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
-
member1Actor.waitForExpectedMessages(RequestVote.class);
member3Actor.waitForExpectedMessages(RequestVote.class);
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
return sendReplicate(actorContext, 1, index);
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
+ final long index) {
return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
+ final Payload payload) {
SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
actorContext.getReplicatedLog().append(newEntry);
return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
sendReplicate(actorContext, lastIndex + i + 1);
leader.handleMessage(followerActor, new AppendEntriesReply(
FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
-
}
- for (int i = 3; i < 5; i++) {
- sendReplicate(actorContext, lastIndex + i + 1);
+ // We are expecting six messages here -- a request to replicate and a consensus-reached message
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
+ for (int i = 0; i < 3; i++) {
+ assertRequestEntry(lastIndex, allMessages, i);
+ assertCommitEntry(lastIndex, allMessages, i);
}
- List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
- // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
- // get sent to the follower - but not the 5th
- assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+ // Now perform another commit, eliciting a request to persist
+ sendReplicate(actorContext, lastIndex + 3 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // This elicits another message for request to replicate
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ assertRequestEntry(lastIndex, allMessages, 3);
- for (int i = 0; i < 4; i++) {
- long expected = allMessages.get(i).getEntries().get(0).getIndex();
- assertEquals(expected, i + 2);
- }
+ sendReplicate(actorContext, lastIndex + 4 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ }
+
+ private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
+ assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
+ assertEquals(ImmutableList.of(), commitReq.getEntries());
+ }
+
+ private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries req = allMessages.get(2 * messageNr);
+ assertEquals(lastIndex + messageNr, req.getLeaderCommit());
+
+ final List<ReplicatedLogEntry> entries = req.getEntries();
+ assertEquals(1, entries.size());
+ assertEquals(messageNr + 2, entries.get(0).getIndex());
}
@Test