import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import scala.concurrent.duration.FiniteDuration;
/**
// This would be passed as the hash code of the last chunk when sending the first chunk
public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
- private final Map<String, FollowerLogInformation> followerToLog;
+ private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private Cancellable heartbeatSchedule = null;
private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
- protected final int minReplicationCount;
+ private int minReplicationCount;
- protected final int minIsolatedLeaderPeerCount;
+ private Optional<SnapshotHolder> snapshot;
- private Optional<ByteString> snapshot;
+ protected AbstractLeader(RaftActorContext context, RaftState state) {
+ super(context, state);
- public AbstractLeader(RaftActorContext context) {
- super(context, RaftState.Leader);
+ setLeaderPayloadVersion(context.getPayloadVersion());
- final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
- for (String followerId : context.getPeerAddresses().keySet()) {
- FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerId, -1, context);
-
- ftlBuilder.put(followerId, followerLogInformation);
+ for(PeerInfo peerInfo: context.getPeers()) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+ followerToLog.put(peerInfo.getId(), followerLogInformation);
}
- followerToLog = ftlBuilder.build();
leaderId = context.getId();
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
- minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
-
- // the isolated Leader peer count will be 1 less than the majority vote count.
- // this is because the vote count has the self vote counted in it
- // for e.g
- // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
- // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
- // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
- minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
+ updateMinReplicaCount();
snapshot = Optional.absent();
return followerToLog.keySet();
}
+ public void addFollower(String followerId) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(
+ context.getPeerInfo(followerId), -1, context);
+ followerToLog.put(followerId, followerLogInformation);
+
+ if(heartbeatSchedule == null) {
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+ }
+ }
+
+ public void removeFollower(String followerId) {
+ followerToLog.remove(followerId);
+ mapFollowerToSnapshot.remove(followerId);
+ }
+
+ public void updateMinReplicaCount() {
+ int numVoting = 0;
+ for(PeerInfo peer: context.getPeers()) {
+ if(peer.isVoting()) {
+ numVoting++;
+ }
+ }
+
+ minReplicationCount = getMajorityVoteCount(numVoting);
+ }
+
+ protected int getMinIsolatedLeaderPeerCount(){
+ //the isolated Leader peer count will be 1 less than the majority vote count.
+ //this is because the vote count has the self vote counted in it
+ //for e.g
+ //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
+ //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
+ //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
+
+ return minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
+ }
+
@VisibleForTesting
- void setSnapshot(Optional<ByteString> snapshot) {
- this.snapshot = snapshot;
+ void setSnapshot(@Nullable Snapshot snapshot) {
+ if(snapshot != null) {
+ this.snapshot = Optional.of(new SnapshotHolder(snapshot));
+ } else {
+ this.snapshot = Optional.absent();
+ }
}
@Override
followerLogInformation.markFollowerActive();
followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
+ followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
boolean updated = false;
if (appendEntriesReply.isSuccess()) {
long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
- if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
+ if(appendEntriesReply.isForceInstallSnapshot()) {
+ // Reset the followers match and next index. This is to signal that this follower has nothing
+ // in common with this Leader and so would require a snapshot to be installed
+ followerLogInformation.setMatchIndex(-1);
+ followerLogInformation.setNextIndex(-1);
+
+ // Force initiate a snapshot capture
+ initiateCaptureSnapshot(followerId);
+ } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
// The follower's log is empty or the last entry is present in the leader's journal
// and the terms match so the follower is just behind the leader's journal from
int replicatedCount = 1;
for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex() >= N) {
+ final PeerInfo peerInfo = context.getPeerInfo(info.getId());
+ if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
replicatedCount++;
}
}
if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null &&
- replicatedLogEntry.getTerm() == currentTerm()) {
+ if (replicatedLogEntry == null) {
+ break;
+ }
+
+ // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+ // "Raft never commits log entries from previous terms by counting replicas".
+ // However we keep looping so we can make progress when new entries in the current term
+ // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+ // counting replicas, then all prior entries are committed indirectly".
+ if (replicatedLogEntry.getTerm() == currentTerm()) {
context.setCommitIndex(N);
}
} else {
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return switchBehavior(new Follower(context));
+ return internalSwitchBehavior(RaftState.Follower);
}
}
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
- setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+ setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
} else if (message instanceof Replicate) {
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
- LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+ LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
logName(), followerId);
return;
}
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ if(followerLogInformation == null) {
+ // This can happen during AddServer if it times out.
+ LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
+ logName(), followerId);
+ mapFollowerToSnapshot.remove(followerId);
+ return;
+ }
+
followerLogInformation.markFollowerActive();
if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
);
}
- followerLogInformation.setMatchIndex(
- context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation.setNextIndex(
- context.getReplicatedLog().getSnapshotIndex() + 1);
+ long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+ followerLogInformation.setMatchIndex(followerMatchIndex);
+ followerLogInformation.setNextIndex(followerMatchIndex + 1);
mapFollowerToSnapshot.remove(followerId);
LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
- logName(), followerId, followerLogInformation.getMatchIndex(),
- followerLogInformation.getNextIndex());
+ logName(), followerId, followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
if (mapFollowerToSnapshot.isEmpty()) {
// once there are no pending followers receiving snapshots
// we can remove snapshot from the memory
- setSnapshot(Optional.<ByteString>absent());
+ setSnapshot(null);
}
wasLastChunk = true;
-
+ if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){
+ UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
+ new UnInitializedFollowerSnapshotReply(followerId);
+ context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
+ LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self");
+ }
} else {
followerToSnapshot.markSendStatus(true);
}
logIndex)
);
- if (followerToLog.isEmpty()) {
+ boolean applyModificationToState = followerToLog.isEmpty()
+ || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
+
+ if(applyModificationToState){
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
- } else {
+ }
+
+ if (!followerToLog.isEmpty()) {
sendAppendEntries(0, false);
}
}
- private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
+ protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
// Send an AppendEntries to all followers
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
followerNextIndex, followerId);
- // FIXME : Sending one entry at a time
if(followerLogInformation.okToReplicate()) {
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+ // Try to send all the entries in the journal but not exceeding the max data size
+ // for a single AppendEntries message.
+ int maxEntries = (int) context.getReplicatedLog().size();
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
+ context.getConfigParams().getSnapshotChunkSize());
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0 &&
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntries = true;
- initiateCaptureSnapshot(followerId, followerNextIndex);
+ if (canInstallSnapshot(followerNextIndex)) {
+ initiateCaptureSnapshot(followerId);
+ }
} else if(sendHeartbeat) {
// we send an AppendEntries, even if the follower is inactive
appendEntries);
}
- followerActor.tell(appendEntries.toSerializable(), actor());
+ followerActor.tell(appendEntries, actor());
}
/**
* 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
* then send the existing snapshot in chunks to the follower.
* @param followerId
- * @param followerNextIndex
*/
- private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
- if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
- context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
-
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot.
- // This could happen if another follower needs an install when one is going on.
- final ActorSelection followerActor = context.getPeerActorSelection(followerId);
- sendSnapshotChunk(followerActor, followerId);
+ public boolean initiateCaptureSnapshot(String followerId) {
+ if (snapshot.isPresent()) {
+ // if a snapshot is present in the memory, most likely another install is in progress
+ // no need to capture snapshot.
+ // This could happen if another follower needs an install when one is going on.
+ final ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ sendSnapshotChunk(followerActor, followerId);
+ return true;
+ } else {
+ return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
+ }
+ }
+ private boolean canInstallSnapshot(long nextIndex){
+ // If the follower's nextIndex is -1 then we might as well send it a snapshot
+ // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
+ // in the snapshot
+ return (nextIndex == -1 ||
+ (!context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex)));
- } else {
- context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
- }
- }
}
private void sendInstallSnapshot() {
LOG.debug("{}: sendInstallSnapshot", logName());
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
- ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
+ String followerId = e.getKey();
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ FollowerLogInformation followerLogInfo = e.getValue();
if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, e.getKey());
+ long nextIndex = followerLogInfo.getNextIndex();
+ if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
+ canInstallSnapshot(nextIndex)) {
+ sendSnapshotChunk(followerActor, followerId);
}
}
}
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
- ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+ byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
+ snapshot.get().getLastIncludedIndex(),
+ snapshot.get().getLastIncludedTerm(),
nextSnapshotChunk,
followerToSnapshot.incrementChunkIndex(),
followerToSnapshot.getTotalChunks(),
Optional.of(followerToSnapshot.getLastChunkHashCode())
- ).toSerializable(),
+ ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
actor()
);
* Acccepts snaphot as ByteString, enters into map for future chunks
* creates and return a ByteString chunk
*/
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
- ByteString nextChunk = followerToSnapshot.getNextChunk();
+ byte[] nextChunk = followerToSnapshot.getNextChunk();
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+ LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
return nextChunk;
}
// need to be sent if there are other messages being sent to the remote
// actor.
heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
- interval, context.getActor(), new SendHeartBeat(),
+ interval, context.getActor(), SendHeartBeat.INSTANCE,
context.getActorSystem().dispatcher(), context.getActor());
}
@Override
- public void close() throws Exception {
+ public void close() {
stopHeartBeat();
}
}
protected boolean isLeaderIsolated() {
- int minPresent = minIsolatedLeaderPeerCount;
+ int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
if (followerLogInformation.isFollowerActive()) {
--minPresent;
}
}
- public ByteString getNextChunk() {
+ public byte[] getNextChunk() {
int snapshotLength = getSnapshotBytes().size();
int start = incrementOffset();
int size = context.getConfigParams().getSnapshotChunkSize();
if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
+ } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+ size = snapshotLength - start;
}
+ byte[] nextChunk = new byte[size];
+ getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+ nextChunkHashCode = Arrays.hashCode(nextChunk);
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
- snapshotLength, start, size);
-
- ByteString substring = getSnapshotBytes().substring(start, start + size);
- nextChunkHashCode = substring.hashCode();
- return substring;
+ LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
+ snapshotLength, start, size, nextChunkHashCode);
+ return nextChunk;
}
/**
public int followerLogSize() {
return followerToLog.size();
}
+
+ private static class SnapshotHolder {
+ private final long lastIncludedTerm;
+ private final long lastIncludedIndex;
+ private final ByteString snapshotBytes;
+
+ SnapshotHolder(Snapshot snapshot) {
+ this.lastIncludedTerm = snapshot.getLastAppliedTerm();
+ this.lastIncludedIndex = snapshot.getLastAppliedIndex();
+ this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+ }
+
+ long getLastIncludedTerm() {
+ return lastIncludedTerm;
+ }
+
+ long getLastIncludedIndex() {
+ return lastIncludedIndex;
+ }
+
+ ByteString getSnapshotBytes() {
+ return snapshotBytes;
+ }
+ }
}