import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
private int minReplicationCount;
- private int minIsolatedLeaderPeerCount;
-
private Optional<SnapshotHolder> snapshot;
public AbstractLeader(RaftActorContext context) {
setLeaderPayloadVersion(context.getPayloadVersion());
- for (String followerId : context.getPeerAddresses().keySet()) {
- FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerId, -1, context);
-
- followerToLog.put(followerId, followerLogInformation);
+ for(PeerInfo peerInfo: context.getPeers()) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+ followerToLog.put(peerInfo.getId(), followerLogInformation);
}
leaderId = context.getId();
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
- updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
+ updateMinReplicaCount();
snapshot = Optional.absent();
return followerToLog.keySet();
}
- public void addFollower(String followerId, FollowerState followerState) {
- FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
- followerLogInformation.setFollowerState(followerState);
+ public void addFollower(String followerId) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(
+ context.getPeerInfo(followerId), -1, context);
followerToLog.put(followerId, followerLogInformation);
if(heartbeatSchedule == null) {
followerToLog.remove(followerId);
}
- public void updateMinReplicaCountAndMinIsolatedLeaderPeerCount(){
- minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
+ public void updateMinReplicaCount() {
+ int numVoting = 0;
+ for(PeerInfo peer: context.getPeers()) {
+ if(peer.isVoting()) {
+ numVoting++;
+ }
+ }
+
+ minReplicationCount = getMajorityVoteCount(numVoting);
+ }
- //the isolated Leader peer count will be 1 less than the majority vote count.
+ protected int getMinIsolatedLeaderPeerCount(){
+ //the isolated Leader peer count will be 1 less than the majority vote count.
//this is because the vote count has the self vote counted in it
//for e.g
//0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
//2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
//4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
- minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
- }
- protected int getMinIsolatedLeaderPeerCount(){
- return minIsolatedLeaderPeerCount;
+ return minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
}
@VisibleForTesting
int replicatedCount = 1;
for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex() >= N) {
+ if ((info.getMatchIndex() >= N) && (context.getPeerInfo(followerId).isVoting())) {
replicatedCount++;
}
}
if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
- context.setCommitIndex(N);
- } else {
+ if (replicatedLogEntry == null) {
break;
}
+
+ // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+ // "Raft never commits log entries from previous terms by counting replicas".
+ // However we keep looping so we can make progress when new entries in the current term
+ // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+ // counting replicas, then all prior entries are committed indirectly".
+ if (replicatedLogEntry.getTerm() == currentTerm()) {
+ context.setCommitIndex(N);
+ }
} else {
break;
}
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
- LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+ LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
logName(), followerId);
return;
}
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ if(followerLogInformation == null) {
+ // This can happen during AddServer if it times out.
+ LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
+ logName(), followerId);
+ mapFollowerToSnapshot.remove(followerId);
+ return;
+ }
+
followerLogInformation.markFollowerActive();
if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
setSnapshot(null);
}
wasLastChunk = true;
- FollowerState followerState = followerLogInformation.getFollowerState();
- if(followerState == FollowerState.VOTING_NOT_INITIALIZED){
+ if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){
UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
new UnInitializedFollowerSnapshotReply(followerId);
context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
}
}
- private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
+ protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
// Send an AppendEntries to all followers
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
* then send the existing snapshot in chunks to the follower.
* @param followerId
*/
- public void initiateCaptureSnapshot(String followerId) {
+ public boolean initiateCaptureSnapshot(String followerId) {
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot.
// This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
-
-
+ return true;
} else {
- context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
this.getReplicatedToAllIndex(), followerId);
}
}
private void sendInstallSnapshot() {
LOG.debug("{}: sendInstallSnapshot", logName());
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
- ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
+ String followerId = e.getKey();
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
FollowerLogInformation followerLogInfo = e.getValue();
if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex();
- if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED ||
+ long nextIndex = followerLogInfo.getNextIndex();
+ if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
canInstallSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, e.getKey());
+ sendSnapshotChunk(followerActor, followerId);
}
}
}
}
protected boolean isLeaderIsolated() {
- int minPresent = minIsolatedLeaderPeerCount;
+ int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
if (followerLogInformation.isFollowerActive()) {
--minPresent;