import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
protected final int minIsolatedLeaderPeerCount;
- private Optional<ByteString> snapshot;
+ private Optional<SnapshotHolder> snapshot;
public AbstractLeader(RaftActorContext context) {
super(context, RaftState.Leader);
+ setLeaderPayloadVersion(context.getPayloadVersion());
+
final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
}
@VisibleForTesting
- void setSnapshot(Optional<ByteString> snapshot) {
- this.snapshot = snapshot;
+ void setSnapshot(@Nullable Snapshot snapshot) {
+ if(snapshot != null) {
+ this.snapshot = Optional.of(new SnapshotHolder(snapshot));
+ } else {
+ this.snapshot = Optional.absent();
+ }
}
@Override
boolean updated = false;
if (appendEntriesReply.isSuccess()) {
- updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
- updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
-
- if(updated && LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(),
- followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
- }
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
} else {
LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
- // TODO: When we find that the follower is out of sync with the
- // Leader we simply decrement that followers next index by 1.
- // Would it be possible to do better than this? The RAFT spec
- // does not explicitly deal with it but may be something for us to
- // think about
+ long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
+ ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
+ if(appendEntriesReply.isForceInstallSnapshot()) {
+ // Reset the followers match and next index. This is to signal that this follower has nothing
+ // in common with this Leader and so would require a snapshot to be installed
+ followerLogInformation.setMatchIndex(-1);
+ followerLogInformation.setNextIndex(-1);
+
+ // Force initiate a snapshot capture
+ initiateCaptureSnapshot(followerId);
+ } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
+ followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
+ // The follower's log is empty or the last entry is present in the leader's journal
+ // and the terms match so the follower is just behind the leader's journal from
+ // the last snapshot, if any. We'll catch up the follower quickly by starting at the
+ // follower's last log index.
+
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+ } else {
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about.
- followerLogInformation.decrNextIndex();
+ followerLogInformation.decrNextIndex();
+ }
}
// Now figure out if this reply warrants a change in the commitIndex
return this;
}
+ private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
+ AppendEntriesReply appendEntriesReply) {
+ boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
+ updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
+
+ if(updated && LOG.isDebugEnabled()) {
+ LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
+ logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
+ }
+ return updated;
+ }
+
private void purgeInMemoryLog() {
//find the lowest index across followers which has been replicated to all.
// lastApplied if there are no followers, so that we keep clearing the log for single-node
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return switchBehavior(new Follower(context));
+ return internalSwitchBehavior(RaftState.Follower);
}
}
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
- setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+ setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
} else if (message instanceof Replicate) {
);
}
- followerLogInformation.setMatchIndex(
- context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation.setNextIndex(
- context.getReplicatedLog().getSnapshotIndex() + 1);
+ long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+ followerLogInformation.setMatchIndex(followerMatchIndex);
+ followerLogInformation.setNextIndex(followerMatchIndex + 1);
mapFollowerToSnapshot.remove(followerId);
LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
if (mapFollowerToSnapshot.isEmpty()) {
// once there are no pending followers receiving snapshots
// we can remove snapshot from the memory
- setSnapshot(Optional.<ByteString>absent());
+ setSnapshot(null);
}
wasLastChunk = true;
logIndex)
);
- if (followerToLog.isEmpty()) {
+ boolean applyModificationToState = followerToLog.isEmpty()
+ || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
+
+ if(applyModificationToState){
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
- } else {
+ }
+
+ if (!followerToLog.isEmpty()) {
sendAppendEntries(0, false);
}
}
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
- LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
- logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
+ LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+ logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
}
if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
followerNextIndex, followerId);
- // FIXME : Sending one entry at a time
if(followerLogInformation.okToReplicate()) {
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+ // Try to send all the entries in the journal but not exceeding the max data size
+ // for a single AppendEntries message.
+ int maxEntries = (int) context.getReplicatedLog().size();
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
+ context.getConfigParams().getSnapshotChunkSize());
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0 &&
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntries = true;
- initiateCaptureSnapshot(followerId, followerNextIndex);
+ if (canInstallSnapshot(followerNextIndex)) {
+ initiateCaptureSnapshot(followerId);
+ }
} else if(sendHeartbeat) {
// we send an AppendEntries, even if the follower is inactive
* 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
* then send the existing snapshot in chunks to the follower.
* @param followerId
- * @param followerNextIndex
*/
- private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
- if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
- context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
-
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot.
- // This could happen if another follower needs an install when one is going on.
- final ActorSelection followerActor = context.getPeerActorSelection(followerId);
- sendSnapshotChunk(followerActor, followerId);
+ private void initiateCaptureSnapshot(String followerId) {
+ if (snapshot.isPresent()) {
+ // if a snapshot is present in the memory, most likely another install is in progress
+ // no need to capture snapshot.
+ // This could happen if another follower needs an install when one is going on.
+ final ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ sendSnapshotChunk(followerActor, followerId);
- } else {
- context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
- }
+ } else {
+ context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
}
}
+ private boolean canInstallSnapshot(long nextIndex){
+ // If the follower's nextIndex is -1 then we might as well send it a snapshot
+ // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
+ // in the snapshot
+ return (nextIndex == -1 ||
+ (!context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex)));
+
+ }
+
private void sendInstallSnapshot() {
LOG.debug("{}: sendInstallSnapshot", logName());
if (followerActor != null) {
long nextIndex = e.getValue().getNextIndex();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ if (canInstallSnapshot(nextIndex)) {
sendSnapshotChunk(followerActor, e.getKey());
}
}
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
- ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+ ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
+ snapshot.get().getLastIncludedIndex(),
+ snapshot.get().getLastIncludedTerm(),
nextSnapshotChunk,
followerToSnapshot.incrementChunkIndex(),
followerToSnapshot.getTotalChunks(),
public int followerLogSize() {
return followerToLog.size();
}
+
+ private static class SnapshotHolder {
+ private final long lastIncludedTerm;
+ private final long lastIncludedIndex;
+ private final ByteString snapshotBytes;
+
+ SnapshotHolder(Snapshot snapshot) {
+ this.lastIncludedTerm = snapshot.getLastAppliedTerm();
+ this.lastIncludedIndex = snapshot.getLastAppliedIndex();
+ this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+ }
+
+ long getLastIncludedTerm() {
+ return lastIncludedTerm;
+ }
+
+ long getLastIncludedIndex() {
+ return lastIncludedIndex;
+ }
+
+ ByteString getSnapshotBytes() {
+ return snapshotBytes;
+ }
+ }
}