import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
- private Cancellable heartbeatSchedule = null;
-
- private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
-
- private int minReplicationCount;
+ /**
+ * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
+ * expect the entries to be modified in sequence, hence we open-code the lookup.
+ *
+ * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
+ * but we already expect those to be far from frequent.
+ */
+ private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
+ private Cancellable heartbeatSchedule = null;
private Optional<SnapshotHolder> snapshot;
+ private int minReplicationCount;
- public AbstractLeader(RaftActorContext context) {
- super(context, RaftState.Leader);
-
- setLeaderPayloadVersion(context.getPayloadVersion());
+ protected AbstractLeader(RaftActorContext context, RaftState state) {
+ super(context, state);
for(PeerInfo peerInfo: context.getPeers()) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
followerToLog.put(peerInfo.getId(), followerLogInformation);
}
- leaderId = context.getId();
-
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
updateMinReplicaCount();
public void removeFollower(String followerId) {
followerToLog.remove(followerId);
+ mapFollowerToSnapshot.remove(followerId);
}
public void updateMinReplicaCount() {
followerLogInformation.markFollowerActive();
followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
+ followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
boolean updated = false;
if (appendEntriesReply.isSuccess()) {
// If there exists an N such that N > commitIndex, a majority
// of matchIndex[i] ≥ N, and log[N].term == currentTerm:
// set commitIndex = N (§5.3, §5.4).
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
+ logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
+ }
+
for (long N = context.getCommitIndex() + 1; ; N++) {
int replicatedCount = 1;
+ LOG.trace("{}: checking Nth index {}", logName(), N);
for (FollowerLogInformation info : followerToLog.values()) {
- if ((info.getMatchIndex() >= N) && (context.getPeerInfo(followerId).isVoting())) {
+ final PeerInfo peerInfo = context.getPeerInfo(info.getId());
+ if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
replicatedCount++;
+ } else if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+ info.getMatchIndex(), peerInfo);
}
}
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount);
+ }
+
if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
+ if (replicatedLogEntry == null) {
+ LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+ logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size());
+ break;
+ }
+
+ // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+ // "Raft never commits log entries from previous terms by counting replicas".
+ // However we keep looping so we can make progress when new entries in the current term
+ // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+ // counting replicas, then all prior entries are committed indirectly".
+ if (replicatedLogEntry.getTerm() == currentTerm()) {
+ LOG.trace("{}: Setting commit index to {}", logName(), N);
context.setCommitIndex(N);
} else {
- break;
+ LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}",
+ logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
}
} else {
+ LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
break;
}
}
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
- final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ final Iterator<ClientRequestTracker> it = trackers.iterator();
while (it.hasNext()) {
final ClientRequestTracker t = it.next();
if (t.getIndex() == logIndex) {
return null;
}
- @Override
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- for (ClientRequestTracker tracker : trackerList) {
- if (tracker.getIndex() == logIndex) {
- return tracker;
- }
- }
- return null;
- }
-
@Override
protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
beforeSendHeartbeat();
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- return this;
-
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
-
} else if (message instanceof Replicate) {
replicate((Replicate) message);
-
- } else if (message instanceof InstallSnapshotReply){
+ } else if (message instanceof InstallSnapshotReply) {
handleInstallSnapshotReply((InstallSnapshotReply) message);
-
+ } else {
+ return super.handleMessage(sender, message);
}
-
- return super.handleMessage(sender, message);
+ return this;
}
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
- LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+ LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
logName(), followerId);
return;
}
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ if(followerLogInformation == null) {
+ // This can happen during AddServer if it times out.
+ LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
+ logName(), followerId);
+ mapFollowerToSnapshot.remove(followerId);
+ return;
+ }
+
followerLogInformation.markFollowerActive();
if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
// Create a tracker entry we will use this later to notify the
// client actor
- trackerList.add(
+ trackers.add(
new ClientRequestTrackerImpl(replicate.getClientActor(),
replicate.getIdentifier(),
logIndex)
}
}
- private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
+ protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
// Send an AppendEntries to all followers
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
appendEntries);
}
- followerActor.tell(appendEntries.toSerializable(), actor());
+ followerActor.tell(appendEntries, actor());
}
/**
* then send the existing snapshot in chunks to the follower.
* @param followerId
*/
- public void initiateCaptureSnapshot(String followerId) {
+ public boolean initiateCaptureSnapshot(String followerId) {
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot.
// This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
-
-
+ return true;
} else {
- context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
this.getReplicatedToAllIndex(), followerId);
}
}
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
- ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
+ byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
followerToSnapshot.incrementChunkIndex(),
followerToSnapshot.getTotalChunks(),
Optional.of(followerToSnapshot.getLastChunkHashCode())
- ).toSerializable(),
+ ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
actor()
);
* Acccepts snaphot as ByteString, enters into map for future chunks
* creates and return a ByteString chunk
*/
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
- ByteString nextChunk = followerToSnapshot.getNextChunk();
+ byte[] nextChunk = followerToSnapshot.getNextChunk();
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+ LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
return nextChunk;
}
// need to be sent if there are other messages being sent to the remote
// actor.
heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
- interval, context.getActor(), new SendHeartBeat(),
+ interval, context.getActor(), SendHeartBeat.INSTANCE,
context.getActorSystem().dispatcher(), context.getActor());
}
@Override
- public void close() throws Exception {
+ public void close() {
stopHeartBeat();
}
@Override
- public String getLeaderId() {
+ public final String getLeaderId() {
return context.getId();
}
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return context.getPayloadVersion();
+ }
+
protected boolean isLeaderIsolated() {
int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
- if (followerLogInformation.isFollowerActive()) {
+ final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+ if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
--minPresent;
if (minPresent == 0) {
- break;
+ return false;
}
}
}
}
}
- public ByteString getNextChunk() {
+ public byte[] getNextChunk() {
int snapshotLength = getSnapshotBytes().size();
int start = incrementOffset();
int size = context.getConfigParams().getSnapshotChunkSize();
if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
+ } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+ size = snapshotLength - start;
}
+ byte[] nextChunk = new byte[size];
+ getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+ nextChunkHashCode = Arrays.hashCode(nextChunk);
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
- snapshotLength, start, size);
-
- ByteString substring = getSnapshotBytes().substring(start, start + size);
- nextChunkHashCode = substring.hashCode();
- return substring;
+ LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
+ snapshotLength, start, size, nextChunkHashCode);
+ return nextChunk;
}
/**