import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import scala.concurrent.duration.FiniteDuration;
/**
* set commitIndex = N (§5.3, §5.4).
*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
-
- // The index of the first chunk that is sent when installing a snapshot
- public static final int FIRST_CHUNK_INDEX = 1;
-
- // The index that the follower should respond with if it needs the install snapshot to be reset
- public static final int INVALID_CHUNK_INDEX = -1;
-
- // This would be passed as the hash code of the last chunk when sending the first chunk
- public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
-
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
- private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
/**
* Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
private Cancellable heartbeatSchedule = null;
- private Optional<SnapshotHolder> snapshot;
+ private Optional<SnapshotHolder> snapshot = Optional.absent();;
private int minReplicationCount;
- protected AbstractLeader(RaftActorContext context, RaftState state) {
+ protected AbstractLeader(RaftActorContext context, RaftState state,
+ @Nullable AbstractLeader initializeFromLeader) {
super(context, state);
- for(PeerInfo peerInfo: context.getPeers()) {
- FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
- followerToLog.put(peerInfo.getId(), followerLogInformation);
+ if(initializeFromLeader != null) {
+ followerToLog.putAll(initializeFromLeader.followerToLog);
+ snapshot = initializeFromLeader.snapshot;
+ trackers.addAll(initializeFromLeader.trackers);
+ } else {
+ for(PeerInfo peerInfo: context.getPeers()) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+ followerToLog.put(peerInfo.getId(), followerLogInformation);
+ }
}
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
updateMinReplicaCount();
- snapshot = Optional.absent();
-
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
+ protected AbstractLeader(RaftActorContext context, RaftState state) {
+ this(context, state, null);
+ }
+
/**
* Return an immutable collection of follower identifiers.
*
public void removeFollower(String followerId) {
followerToLog.remove(followerId);
- mapFollowerToSnapshot.remove(followerId);
}
public void updateMinReplicaCount() {
}
}
+ @VisibleForTesting
+ boolean hasSnapshot() {
+ return snapshot.isPresent();
+ }
+
@Override
protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
boolean updated = false;
- if (appendEntriesReply.isSuccess()) {
+ if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
+ // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
+ // in raft as a node cannot become leader if it's log is behind another's. However, the
+ // non-voting semantics deviate a bit from raft. Only voting members participate in
+ // elections and can become leader so it's possible for a non-voting follower to be ahead
+ // of the leader. This can happen if persistence is disabled and all voting members are
+ // restarted. In this case, the voting leader will start out with an empty log however
+ // the non-voting followers still retain the previous data in memory. On the first
+ // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex
+ // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned
+ // lastLogIndex may be higher in which case we want to reset the follower by installing a
+ // snapshot. It's also possible that the follower's last log index is behind the leader's.
+ // However in this case the log terms won't match and the logs will conflict - this is handled
+ // elsewhere.
+ LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot",
+ logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
+ context.getReplicatedLog().lastIndex());
+
+ followerLogInformation.setMatchIndex(-1);
+ followerLogInformation.setNextIndex(-1);
+
+ initiateCaptureSnapshot(followerId);
+ updated = true;
+ } else if (appendEntriesReply.isSuccess()) {
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
} else {
LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
- ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
+ long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex);
if(appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// in common with this Leader and so would require a snapshot to be installed
// Force initiate a snapshot capture
initiateCaptureSnapshot(followerId);
- } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
- followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
+ } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 &&
+ followersLastLogTerm == appendEntriesReply.getLogLastTerm())) {
// The follower's log is empty or the last entry is present in the leader's journal
// and the terms match so the follower is just behind the leader's journal from
// the last snapshot, if any. We'll catch up the follower quickly by starting at the
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
} else {
- // TODO: When we find that the follower is out of sync with the
- // Leader we simply decrement that followers next index by 1.
- // Would it be possible to do better than this? The RAFT spec
- // does not explicitly deal with it but may be something for us to
- // think about.
+ // The follower's log conflicts with leader's log so decrement follower's next index by 1
+ // in an attempt to find where the logs match.
+
+ LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index",
+ logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm);
followerLogInformation.decrNextIndex();
}
LOG.trace("{}: checking Nth index {}", logName(), N);
for (FollowerLogInformation info : followerToLog.values()) {
final PeerInfo peerInfo = context.getPeerInfo(info.getId());
- if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
+ if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) {
replicatedCount++;
} else if(LOG.isTraceEnabled()) {
LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
//Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
return this;
}
protected void beforeSendHeartbeat(){}
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
- Object message = fromSerializableMessage(originalMessage);
-
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
String followerId = reply.getFollowerId();
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-
- if (followerToSnapshot == null) {
- LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
- logName(), followerId);
- return;
- }
-
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
if(followerLogInformation == null) {
// This can happen during AddServer if it times out.
LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
logName(), followerId);
- mapFollowerToSnapshot.remove(followerId);
+ return;
+ }
+
+ LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
+ if (installSnapshotState == null) {
+ LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
+ logName(), followerId);
return;
}
followerLogInformation.markFollowerActive();
- if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+ if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
boolean wasLastChunk = false;
if (reply.isSuccess()) {
- if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+ if(installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
if(LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshotReply received, " +
long followerMatchIndex = snapshot.get().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
- mapFollowerToSnapshot.remove(followerId);
+ followerLogInformation.clearLeaderInstallSnapshotState();
LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
logName(), followerId, followerLogInformation.getMatchIndex(),
followerLogInformation.getNextIndex());
- if (mapFollowerToSnapshot.isEmpty()) {
+ if (!anyFollowersInstallingSnapshot()) {
// once there are no pending followers receiving snapshots
// we can remove snapshot from the memory
setSnapshot(null);
}
+
wasLastChunk = true;
if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){
UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self");
}
} else {
- followerToSnapshot.markSendStatus(true);
+ installSnapshotState.markSendStatus(true);
}
} else {
LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
logName(), reply.getChunkIndex());
- followerToSnapshot.markSendStatus(false);
+ installSnapshotState.markSendStatus(false);
}
if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
// Since the follower is now caught up try to purge the log.
purgeInMemoryLog();
- } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+ } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if(followerActor != null) {
sendSnapshotChunk(followerActor, followerId);
} else {
LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
logName(), reply.getChunkIndex(), followerId,
- followerToSnapshot.getChunkIndex());
+ installSnapshotState.getChunkIndex());
- if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+ if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){
// Since the Follower did not find this index to be valid we should reset the follower snapshot
// so that Installing the snapshot can resume from the beginning
- followerToSnapshot.reset();
+ installSnapshotState.reset();
}
}
}
+ private boolean anyFollowersInstallingSnapshot() {
+ for(FollowerLogInformation info: followerToLog.values()) {
+ if(info.getInstallSnapshotState() != null) {
+ return true;
+ }
+
+ }
+
+ return false;
+ }
+
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
- LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(),
- replicate.getIdentifier(), logIndex);
+ LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
+ replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
// Create a tracker entry we will use this later to notify the
// client actor
- trackers.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
+ if(replicate.getClientActor() != null) {
+ trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
+ logIndex));
+ }
- boolean applyModificationToState = followerToLog.isEmpty()
+ boolean applyModificationToState = !context.anyVotingPeers()
|| context.getRaftPolicy().applyModificationToStateBeforeConsensus();
if(applyModificationToState){
boolean sendAppendEntries = false;
List<ReplicatedLogEntry> entries = Collections.emptyList();
- if (mapFollowerToSnapshot.get(followerId) != null) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
+ if (installSnapshotState != null) {
// if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
sendSnapshotChunk(followerActor, followerId);
} else if(sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
// then snapshot should be sent
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+ LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " +
"follower-nextIndex: %d, leader-snapshot-index: %d, " +
"leader-last-index: %d", logName(), followerId,
followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
List<ReplicatedLogEntry> entries, String followerId) {
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
+ getLogEntryIndex(followerNextIndex - 1),
+ getLogEntryTerm(followerNextIndex - 1), entries,
context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
if(!entries.isEmpty() || LOG.isTraceEnabled()) {
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
- return (nextIndex == -1 ||
+ return nextIndex == -1 ||
(!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex)));
+ && context.getReplicatedLog().isInSnapshot(nextIndex));
}
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
- int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
+ int nextChunkIndex = installSnapshotState.incrementChunkIndex();
Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
- if(followerToSnapshot.isLastChunk(nextChunkIndex)) {
+ if(installSnapshotState.isLastChunk(nextChunkIndex)) {
serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
}
snapshot.get().getLastIncludedTerm(),
nextSnapshotChunk,
nextChunkIndex,
- followerToSnapshot.getTotalChunks(),
- Optional.of(followerToSnapshot.getLastChunkHashCode()),
+ installSnapshotState.getTotalChunks(),
+ Optional.of(installSnapshotState.getLastChunkHashCode()),
serverConfig
).toSerializable(followerToLog.get(followerId).getRaftVersion()),
actor()
if(LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
- logName(), followerActor.path(), followerToSnapshot.getChunkIndex(),
- followerToSnapshot.getTotalChunks());
+ logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
+ installSnapshotState.getTotalChunks());
}
}
} catch (IOException e) {
* creates and return a ByteString chunk
*/
private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- if (followerToSnapshot == null) {
- followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
- mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+ LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
+ if (installSnapshotState == null) {
+ installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
+ logName());
+ followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState);
}
- byte[] nextChunk = followerToSnapshot.getNextChunk();
+ byte[] nextChunk = installSnapshotState.getNextChunk();
LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
}
}
}
- return (minPresent != 0);
- }
-
- /**
- * Encapsulates the snapshot bytestring and handles the logic of sending
- * snapshot chunks
- */
- protected class FollowerToSnapshot {
- private final ByteString snapshotBytes;
- private int offset = 0;
- // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
- private int replyReceivedForOffset;
- // if replyStatus is false, the previous chunk is attempted
- private boolean replyStatus = false;
- private int chunkIndex;
- private final int totalChunks;
- private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-
- public FollowerToSnapshot(ByteString snapshotBytes) {
- this.snapshotBytes = snapshotBytes;
- int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
- logName(), size, totalChunks);
- }
- replyReceivedForOffset = -1;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- }
-
- public ByteString getSnapshotBytes() {
- return snapshotBytes;
- }
-
- public int incrementOffset() {
- if(replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- offset = offset + context.getConfigParams().getSnapshotChunkSize();
- }
- return offset;
- }
-
- public int incrementChunkIndex() {
- if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- chunkIndex = chunkIndex + 1;
- }
- return chunkIndex;
- }
-
- public int getChunkIndex() {
- return chunkIndex;
- }
-
- public int getTotalChunks() {
- return totalChunks;
- }
-
- public boolean canSendNextChunk() {
- // we only send a false if a chunk is sent but we have not received a reply yet
- return replyReceivedForOffset == offset;
- }
-
- public boolean isLastChunk(int chunkIndex) {
- return totalChunks == chunkIndex;
- }
-
- public void markSendStatus(boolean success) {
- if (success) {
- // if the chunk sent was successful
- replyReceivedForOffset = offset;
- replyStatus = true;
- lastChunkHashCode = nextChunkHashCode;
- } else {
- // if the chunk sent was failure
- replyReceivedForOffset = offset;
- replyStatus = false;
- }
- }
-
- public byte[] getNextChunk() {
- int snapshotLength = getSnapshotBytes().size();
- int start = incrementOffset();
- int size = context.getConfigParams().getSnapshotChunkSize();
- if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
- size = snapshotLength;
- } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
-
- byte[] nextChunk = new byte[size];
- getSnapshotBytes().copyTo(nextChunk, start, 0, size);
- nextChunkHashCode = Arrays.hashCode(nextChunk);
-
- LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
- snapshotLength, start, size, nextChunkHashCode);
- return nextChunk;
- }
-
- /**
- * reset should be called when the Follower needs to be sent the snapshot from the beginning
- */
- public void reset(){
- offset = 0;
- replyStatus = false;
- replyReceivedForOffset = offset;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- }
-
- public int getLastChunkHashCode() {
- return lastChunkHashCode;
- }
+ return minPresent != 0;
}
// called from example-actor for printing the follower-states
return followerToLog.get(followerId);
}
- @VisibleForTesting
- protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
- mapFollowerToSnapshot.put(followerId, snapshot);
- }
-
- @VisibleForTesting
- public int followerSnapshotSize() {
- return mapFollowerToSnapshot.size();
- }
-
@VisibleForTesting
public int followerLogSize() {
return followerToLog.size();