import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import scala.concurrent.duration.FiniteDuration;
/**
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
- private Cancellable heartbeatSchedule = null;
-
- private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
+ /**
+ * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
+ * expect the entries to be modified in sequence, hence we open-code the lookup.
+ *
+ * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
+ * but we already expect those to be far from frequent.
+ */
+ private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
+ private Cancellable heartbeatSchedule = null;
+ private Optional<SnapshotHolder> snapshot = Optional.absent();;
private int minReplicationCount;
- private Optional<SnapshotHolder> snapshot;
+ protected AbstractLeader(RaftActorContext context, RaftState state,
+ @Nullable AbstractLeader initializeFromLeader) {
+ super(context, state);
- public AbstractLeader(RaftActorContext context) {
- super(context, RaftState.Leader);
-
- setLeaderPayloadVersion(context.getPayloadVersion());
-
- for(PeerInfo peerInfo: context.getPeers()) {
- FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
- followerToLog.put(peerInfo.getId(), followerLogInformation);
+ if(initializeFromLeader != null) {
+ followerToLog.putAll(initializeFromLeader.followerToLog);
+ mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot);
+ snapshot = initializeFromLeader.snapshot;
+ trackers.addAll(initializeFromLeader.trackers);
+ } else {
+ for(PeerInfo peerInfo: context.getPeers()) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+ followerToLog.put(peerInfo.getId(), followerLogInformation);
+ }
}
- leaderId = context.getId();
-
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
updateMinReplicaCount();
- snapshot = Optional.absent();
-
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
+ protected AbstractLeader(RaftActorContext context, RaftState state) {
+ this(context, state, null);
+ }
+
/**
* Return an immutable collection of follower identifiers.
*
public void removeFollower(String followerId) {
followerToLog.remove(followerId);
+ mapFollowerToSnapshot.remove(followerId);
}
public void updateMinReplicaCount() {
followerLogInformation.markFollowerActive();
followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
+ followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
boolean updated = false;
- if (appendEntriesReply.isSuccess()) {
+ if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
+ // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
+ // in raft as a node cannot become leader if it's log is behind another's. However, the
+ // non-voting semantics deviate a bit from raft. Only voting members participate in
+ // elections and can become leader so it's possible for a non-voting follower to be ahead
+ // of the leader. This can happen if persistence is disabled and all voting members are
+ // restarted. In this case, the voting leader will start out with an empty log however
+ // the non-voting followers still retain the previous data in memory. On the first
+ // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex
+ // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned
+ // lastLogIndex may be higher in which case we want to reset the follower by installing a
+ // snapshot. It's also possible that the follower's last log index is behind the leader's.
+ // However in this case the log terms won't match and the logs will conflict - this is handled
+ // elsewhere.
+ LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot",
+ logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
+ context.getReplicatedLog().lastIndex());
+
+ followerLogInformation.setMatchIndex(-1);
+ followerLogInformation.setNextIndex(-1);
+
+ initiateCaptureSnapshot(followerId);
+ updated = true;
+ } else if (appendEntriesReply.isSuccess()) {
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
} else {
LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
- ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
+ long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex);
if(appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// in common with this Leader and so would require a snapshot to be installed
// Force initiate a snapshot capture
initiateCaptureSnapshot(followerId);
- } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
- followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
+ } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 &&
+ followersLastLogTerm == appendEntriesReply.getLogLastTerm())) {
// The follower's log is empty or the last entry is present in the leader's journal
// and the terms match so the follower is just behind the leader's journal from
// the last snapshot, if any. We'll catch up the follower quickly by starting at the
updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
} else {
- // TODO: When we find that the follower is out of sync with the
- // Leader we simply decrement that followers next index by 1.
- // Would it be possible to do better than this? The RAFT spec
- // does not explicitly deal with it but may be something for us to
- // think about.
+ // The follower's log conflicts with leader's log so decrement follower's next index by 1
+ // in an attempt to find where the logs match.
+
+ LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index",
+ logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm);
followerLogInformation.decrNextIndex();
}
// If there exists an N such that N > commitIndex, a majority
// of matchIndex[i] ≥ N, and log[N].term == currentTerm:
// set commitIndex = N (§5.3, §5.4).
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
+ logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
+ }
+
for (long N = context.getCommitIndex() + 1; ; N++) {
int replicatedCount = 1;
+ LOG.trace("{}: checking Nth index {}", logName(), N);
for (FollowerLogInformation info : followerToLog.values()) {
final PeerInfo peerInfo = context.getPeerInfo(info.getId());
- if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
+ if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) {
replicatedCount++;
+ } else if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+ info.getMatchIndex(), peerInfo);
}
}
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount);
+ }
+
if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
if (replicatedLogEntry == null) {
+ LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+ logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size());
break;
}
// reach consensus, as per §5.4.1: "once an entry from the current term is committed by
// counting replicas, then all prior entries are committed indirectly".
if (replicatedLogEntry.getTerm() == currentTerm()) {
+ LOG.trace("{}: Setting commit index to {}", logName(), N);
context.setCommitIndex(N);
+ } else {
+ LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}",
+ logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
}
} else {
+ LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
break;
}
}
//Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
return this;
}
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
- final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ final Iterator<ClientRequestTracker> it = trackers.iterator();
while (it.hasNext()) {
final ClientRequestTracker t = it.next();
if (t.getIndex() == logIndex) {
return null;
}
- @Override
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- for (ClientRequestTracker tracker : trackerList) {
- if (tracker.getIndex() == logIndex) {
- return tracker;
- }
- }
- return null;
- }
-
@Override
protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
protected void beforeSendHeartbeat(){}
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
- Object message = fromSerializableMessage(originalMessage);
-
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
beforeSendHeartbeat();
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- return this;
-
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
-
} else if (message instanceof Replicate) {
replicate((Replicate) message);
-
- } else if (message instanceof InstallSnapshotReply){
+ } else if (message instanceof InstallSnapshotReply) {
handleInstallSnapshotReply((InstallSnapshotReply) message);
-
+ } else {
+ return super.handleMessage(sender, message);
}
-
- return super.handleMessage(sender, message);
+ return this;
}
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
- LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(),
- replicate.getIdentifier(), logIndex);
+ LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
+ replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
// Create a tracker entry we will use this later to notify the
// client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
+ if(replicate.getClientActor() != null) {
+ trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
+ logIndex));
+ }
- boolean applyModificationToState = followerToLog.isEmpty()
+ boolean applyModificationToState = !context.anyVotingPeers()
|| context.getRaftPolicy().applyModificationToStateBeforeConsensus();
if(applyModificationToState){
// then snapshot should be sent
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+ LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " +
"follower-nextIndex: %d, leader-snapshot-index: %d, " +
"leader-last-index: %d", logName(), followerId,
followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
List<ReplicatedLogEntry> entries, String followerId) {
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
+ getLogEntryIndex(followerNextIndex - 1),
+ getLogEntryTerm(followerNextIndex - 1), entries,
context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
if(!entries.isEmpty() || LOG.isTraceEnabled()) {
appendEntries);
}
- followerActor.tell(appendEntries.toSerializable(), actor());
+ followerActor.tell(appendEntries, actor());
}
/**
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
- return (nextIndex == -1 ||
+ return nextIndex == -1 ||
(!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex)));
+ && context.getReplicatedLog().isInSnapshot(nextIndex));
}
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
- ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
+ byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
+ Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+ if(followerToSnapshot.isLastChunk(nextChunkIndex)) {
+ serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+ }
+
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
snapshot.get().getLastIncludedIndex(),
snapshot.get().getLastIncludedTerm(),
nextSnapshotChunk,
- followerToSnapshot.incrementChunkIndex(),
+ nextChunkIndex,
followerToSnapshot.getTotalChunks(),
- Optional.of(followerToSnapshot.getLastChunkHashCode())
- ).toSerializable(),
+ Optional.of(followerToSnapshot.getLastChunkHashCode()),
+ serverConfig
+ ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
actor()
);
* Acccepts snaphot as ByteString, enters into map for future chunks
* creates and return a ByteString chunk
*/
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
- ByteString nextChunk = followerToSnapshot.getNextChunk();
+ byte[] nextChunk = followerToSnapshot.getNextChunk();
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+ LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
return nextChunk;
}
// need to be sent if there are other messages being sent to the remote
// actor.
heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
- interval, context.getActor(), new SendHeartBeat(),
+ interval, context.getActor(), SendHeartBeat.INSTANCE,
context.getActorSystem().dispatcher(), context.getActor());
}
@Override
- public void close() throws Exception {
+ public void close() {
stopHeartBeat();
}
@Override
- public String getLeaderId() {
+ public final String getLeaderId() {
return context.getId();
}
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return context.getPayloadVersion();
+ }
+
protected boolean isLeaderIsolated() {
int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
- if (followerLogInformation.isFollowerActive()) {
+ final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+ if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
--minPresent;
if (minPresent == 0) {
- break;
+ return false;
}
}
}
- return (minPresent != 0);
+ return minPresent != 0;
}
/**
public FollowerToSnapshot(ByteString snapshotBytes) {
this.snapshotBytes = snapshotBytes;
int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+ totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) +
+ (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
logName(), size, totalChunks);
}
}
- public ByteString getNextChunk() {
+ public byte[] getNextChunk() {
int snapshotLength = getSnapshotBytes().size();
int start = incrementOffset();
int size = context.getConfigParams().getSnapshotChunkSize();
if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
+ } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+ size = snapshotLength - start;
}
+ byte[] nextChunk = new byte[size];
+ getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+ nextChunkHashCode = Arrays.hashCode(nextChunk);
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
- snapshotLength, start, size);
-
- ByteString substring = getSnapshotBytes().substring(start, start + size);
- nextChunkHashCode = substring.hashCode();
- return substring;
+ LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
+ snapshotLength, start, size, nextChunkHashCode);
+ return nextChunk;
}
/**