import com.google.common.collect.ImmutableMap.Builder;
import com.google.protobuf.ByteString;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import java.util.Map.Entry;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
private final Map<String, FollowerLogInformation> followerToLog;
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
- protected final Set<String> followers;
-
private Cancellable heartbeatSchedule = null;
- private List<ClientRequestTracker> trackerList = new ArrayList<>();
+ private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
protected final int minReplicationCount;
private Optional<ByteString> snapshot;
+ private long replicatedToAllIndex = -1;
+
public AbstractLeader(RaftActorContext context) {
super(context);
- followers = context.getPeerAddresses().keySet();
-
final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
- for (String followerId : followers) {
+ for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId,
context.getCommitIndex(), -1,
leaderId = context.getId();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Leader has following peers: {}", followers);
- }
+ LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
- minReplicationCount = getMajorityVoteCount(followers.size());
+ minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
// the isolated Leader peer count will be 1 less than the majority vote count.
// this is because the vote count has the self vote counted in it
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
// prevent election timeouts (§5.2)
- scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ sendAppendEntries(0);
+ }
+
+ /**
+ * Return an immutable collection of follower identifiers.
+ *
+ * @return Collection of follower IDs
+ */
+ protected final Collection<String> getFollowerIds() {
+ return followerToLog.keySet();
}
private Optional<ByteString> getSnapshot() {
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
return this;
if(! appendEntriesReply.isSuccess()) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntriesReply.toString());
+ LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
}
}
followerToLog.get(followerId);
if(followerLogInformation == null){
- LOG.error("Unknown follower {}", followerId);
+ LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
return this;
}
int replicatedCount = 1;
for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex().get() >= N) {
+ if (info.getMatchIndex() >= N) {
replicatedCount++;
}
}
applyLogToStateMachine(context.getCommitIndex());
}
+ if (!context.isSnapshotCaptureInitiated()) {
+ purgeInMemoryLog();
+ }
+
+ //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+ sendUpdatesToFollower(followerId, followerLogInformation, false);
return this;
}
- protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+ private void purgeInMemoryLog() {
+ //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+ // we would delete the in-mem log from that index on, in-order to minimize mem usage
+ // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+ for (FollowerLogInformation info : followerToLog.values()) {
+ minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+ }
+
+ replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+ }
- ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
- if(toRemove != null) {
- trackerList.remove(toRemove);
+ @Override
+ protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+ final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ while (it.hasNext()) {
+ final ClientRequestTracker t = it.next();
+ if (t.getIndex() == logIndex) {
+ it.remove();
+ return t;
+ }
}
- return toRemove;
+ return null;
}
+ @Override
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
for (ClientRequestTracker tracker : trackerList) {
if (tracker.getIndex() == logIndex) {
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
+ rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
return switchBehavior(new Follower(context));
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
String followerId = reply.getFollowerId();
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
+ if (followerToSnapshot == null) {
+ LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+ context.getId(), followerId);
+ return;
+ }
+
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
followerLogInformation.markFollowerActive();
- if (followerToSnapshot != null &&
- followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
+ if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+ boolean wasLastChunk = false;
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshotReply received, " +
+ LOG.debug("{}: InstallSnapshotReply received, " +
"last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
+ context.getId(), reply.getChunkIndex(), followerId,
context.getReplicatedLog().getSnapshotIndex() + 1
);
}
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
+ LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
+ context.getId(), followerToLog.get(followerId).getNextIndex());
}
if (mapFollowerToSnapshot.isEmpty()) {
// we can remove snapshot from the memory
setSnapshot(Optional.<ByteString>absent());
}
+ wasLastChunk = true;
} else {
followerToSnapshot.markSendStatus(true);
}
} else {
- LOG.info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex()
- );
+ LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+ context.getId(), reply.getChunkIndex());
followerToSnapshot.markSendStatus(false);
}
+ if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if(followerActor != null) {
+ sendSnapshotChunk(followerActor, followerId);
+ }
+ }
+
} else {
- LOG.error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
- );
+ LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+ context.getId(), reply.getChunkIndex(), followerId,
+ followerToSnapshot.getChunkIndex());
if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
// Since the Follower did not find this index to be valid we should reset the follower snapshot
long logIndex = replicate.getReplicatedLogEntry().getIndex();
if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message {}", logIndex);
+ LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
}
// Create a tracker entry we will use this later to notify the
logIndex)
);
- if (followers.size() == 0) {
+ if (followerToLog.isEmpty()) {
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
} else {
- sendAppendEntries();
+ sendAppendEntries(0);
}
}
- private void sendAppendEntries() {
+ private void sendAppendEntries(long timeSinceLastActivityInterval) {
// Send an AppendEntries to all followers
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ final String followerId = e.getKey();
+ final FollowerLogInformation followerLogInformation = e.getValue();
+ // This checks helps not to send a repeat message to the follower
+ if(!followerLogInformation.isFollowerActive() ||
+ followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
+ sendUpdatesToFollower(followerId, followerLogInformation, true);
+ }
+ }
+ }
- if (followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex().get();
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
- List<ReplicatedLogEntry> entries = null;
-
- if (mapFollowerToSnapshot.get(followerId) != null) {
- // if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
- } else {
- // we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList());
- }
+ /**
+ *
+ * This method checks if any update needs to be sent to the given follower. This includes append log entries,
+ * sending next snapshot chunk, and initiating a snapshot.
+ * @return true if any update is sent, false otherwise
+ */
- } else {
- long leaderLastIndex = context.getReplicatedLog().lastIndex();
- long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-
- if (isFollowerActive &&
- context.getReplicatedLog().isPresent(followerNextIndex)) {
- // FIXME : Sending one entry at a time
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- } else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex ) {
- // if the followers next index is not present in the leaders log, and
- // if the follower is just not starting and if leader's index is more than followers index
- // then snapshot should be sent
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("InitiateInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
- }
- actor().tell(new InitiateInstallSnapshot(), actor());
-
- // we would want to sent AE as the capture snapshot might take time
- entries = Collections.<ReplicatedLogEntry>emptyList();
-
- } else {
- //we send an AppendEntries, even if the follower is inactive
- // in-order to update the followers timestamp, in case it becomes active again
- entries = Collections.<ReplicatedLogEntry>emptyList();
+ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
+ boolean sendHeartbeat) {
+
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if (followerActor != null) {
+ long followerNextIndex = followerLogInformation.getNextIndex();
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+
+ if (mapFollowerToSnapshot.get(followerId) != null) {
+ // if install snapshot is in process , then sent next chunk if possible
+ if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerId);
+ } else if(sendHeartbeat) {
+ // we send a heartbeat even if we have not received a reply for the last chunk
+ sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ }
+ } else {
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+ if (isFollowerActive &&
+ context.getReplicatedLog().isPresent(followerNextIndex)) {
+ // FIXME : Sending one entry at a time
+ final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+ sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
+
+ } else if (isFollowerActive && followerNextIndex >= 0 &&
+ leaderLastIndex >= followerNextIndex) {
+ // if the followers next index is not present in the leaders log, and
+ // if the follower is just not starting and if leader's index is more than followers index
+ // then snapshot should be sent
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("InitiateInstallSnapshot to follower:{}," +
+ "follower-nextIndex:{}, leader-snapshot-index:{}, " +
+ "leader-last-index:{}", followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+ );
}
+ actor().tell(new InitiateInstallSnapshot(), actor());
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+ // Send heartbeat to follower whenever install snapshot is initiated.
+ sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ } else if(sendHeartbeat) {
+ //we send an AppendEntries, even if the follower is inactive
+ // in-order to update the followers timestamp, in case it becomes active again
+ sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
}
+
}
}
}
private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
- List<ReplicatedLogEntry> entries) {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
+ List<ReplicatedLogEntry> entries, String followerId) {
+ AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(followerNextIndex),
+ prevLogTerm(followerNextIndex), entries,
+ context.getCommitIndex(), replicatedToAllIndex);
+
+ if(!entries.isEmpty()) {
+ LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+ appendEntries);
+ }
+
+ followerActor.tell(appendEntries.toSerializable(), actor());
}
/**
*
*/
private void installSnapshotIfNeeded() {
- for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+ }
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
- long nextIndex = followerLogInformation.getNextIndex().get();
+ if (followerActor != null) {
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", followerId);
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, e.getKey());
- } else {
+ } else if (!context.isSnapshotCaptureInitiated()) {
initiateCaptureSnapshot();
//we just need 1 follower who would need snapshot to be installed.
// when we have the snapshot captured, we would again check (in SendInstallSnapshot)
// on every install snapshot, we try to capture the snapshot.
// Once a capture is going on, another one issued will get ignored by RaftActor.
private void initiateCaptureSnapshot() {
- LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+ LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
actor());
+ context.setSnapshotCaptureInitiated(true);
}
private void sendInstallSnapshot() {
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long nextIndex = followerLogInformation.getNextIndex().get();
+ if (followerActor != null) {
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, e.getKey());
}
}
}
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
+ ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+
+ // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
+ // followerId to the followerToSnapshot map.
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,snapshot.get()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks(),
- Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
+ nextSnapshotChunk,
+ followerToSnapshot.incrementChunkIndex(),
+ followerToSnapshot.getTotalChunks(),
+ Optional.of(followerToSnapshot.getLastChunkHashCode())
).toSerializable(),
actor()
);
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ context.getId(), followerActor.path(),
+ followerToSnapshot.getChunkIndex(),
+ followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
- LOG.error(e, "InstallSnapshot failed for Leader.");
+ LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e);
}
}
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
if (LOG.isDebugEnabled()) {
- LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
}
return nextChunk;
}
private void sendHeartBeat() {
- if (followers.size() > 0) {
- sendAppendEntries();
+ if (!followerToLog.isEmpty()) {
+ sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis());
}
}
}
private void scheduleHeartBeat(FiniteDuration interval) {
- if(followers.size() == 0){
+ if (followerToLog.isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
return;
* snapshot chunks
*/
protected class FollowerToSnapshot {
- private ByteString snapshotBytes;
+ private final ByteString snapshotBytes;
private int offset = 0;
// the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
private int replyReceivedForOffset;
// if replyStatus is false, the previous chunk is attempted
private boolean replyStatus = false;
private int chunkIndex;
- private int totalChunks;
+ private final int totalChunks;
private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot {} bytes, total chunks to send:{}",
- size, totalChunks);
+ LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
+ context.getId(), size, totalChunks);
}
replyReceivedForOffset = -1;
chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
if(LOG.isDebugEnabled()) {
- LOG.debug("length={}, offset={},size={}",
+ LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
snapshotLength, start, size);
}
ByteString substring = getSnapshotBytes().substring(start, start + size);