X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=0d15e756354a9cb541e24f6b5e8e242fc5fee0e8;hp=85f2f153ab3046d6541225c95d3863be3435b6b0;hb=01c5a7cc52f8a438053d52ebb91e531493ca765a;hpb=c68d251880d95d6d2f8df70c67d2cdd3a3a47685 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 85f2f153ab..0d15e75635 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,8 +14,6 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collection; @@ -31,10 +29,12 @@ import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; +import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -44,6 +44,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import scala.concurrent.duration.FiniteDuration; /** @@ -79,16 +80,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // This would be passed as the hash code of the last chunk when sending the first chunk public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; - private final Map followerToLog; + private final Map followerToLog = new HashMap<>(); private final Map mapFollowerToSnapshot = new HashMap<>(); private Cancellable heartbeatSchedule = null; private final Collection trackerList = new LinkedList<>(); - protected final int minReplicationCount; - - protected final int minIsolatedLeaderPeerCount; + private int minReplicationCount; private Optional snapshot; @@ -97,28 +96,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { setLeaderPayloadVersion(context.getPayloadVersion()); - final Builder ftlBuilder = ImmutableMap.builder(); - for (String followerId : context.getPeerAddresses().keySet()) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, -1, context); - - ftlBuilder.put(followerId, followerLogInformation); + for(PeerInfo peerInfo: context.getPeers()) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + followerToLog.put(peerInfo.getId(), followerLogInformation); } - followerToLog = ftlBuilder.build(); leaderId = context.getId(); LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); - minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); - - // the isolated Leader peer count will be 1 less than the majority vote count. - // this is because the vote count has the self vote counted in it - // for e.g - // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 - // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 - // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 - minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0; + updateMinReplicaCount(); snapshot = Optional.absent(); @@ -141,6 +128,43 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } + public void addFollower(String followerId) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl( + context.getPeerInfo(followerId), -1, context); + followerToLog.put(followerId, followerLogInformation); + + if(heartbeatSchedule == null) { + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + } + } + + public void removeFollower(String followerId) { + followerToLog.remove(followerId); + mapFollowerToSnapshot.remove(followerId); + } + + public void updateMinReplicaCount() { + int numVoting = 0; + for(PeerInfo peer: context.getPeers()) { + if(peer.isVoting()) { + numVoting++; + } + } + + minReplicationCount = getMajorityVoteCount(numVoting); + } + + protected int getMinIsolatedLeaderPeerCount(){ + //the isolated Leader peer count will be 1 less than the majority vote count. + //this is because the vote count has the self vote counted in it + //for e.g + //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 + //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 + //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 + + return minReplicationCount > 0 ? (minReplicationCount - 1) : 0; + } + @VisibleForTesting void setSnapshot(@Nullable Snapshot snapshot) { if(snapshot != null) { @@ -196,7 +220,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex); - if(followerLastLogIndex < 0 || (followersLastLogEntry != null && + if(appendEntriesReply.isForceInstallSnapshot()) { + // Reset the followers match and next index. This is to signal that this follower has nothing + // in common with this Leader and so would require a snapshot to be installed + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + // Force initiate a snapshot capture + initiateCaptureSnapshot(followerId); + } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null && followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) { // The follower's log is empty or the last entry is present in the leader's journal // and the terms match so the follower is just behind the leader's journal from @@ -219,22 +251,51 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N (§5.3, §5.4). + if(LOG.isTraceEnabled()) { + LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); + } + for (long N = context.getCommitIndex() + 1; ; N++) { int replicatedCount = 1; + LOG.trace("{}: checking Nth index {}", logName(), N); for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex() >= N) { + final PeerInfo peerInfo = context.getPeerInfo(info.getId()); + if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) { replicatedCount++; + } else if(LOG.isDebugEnabled()) { + LOG.debug("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), + info.getMatchIndex(), peerInfo); } } + if(LOG.isTraceEnabled()) { + LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount); + } + if (replicatedCount >= minReplicationCount) { ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); - if (replicatedLogEntry != null && - replicatedLogEntry.getTerm() == currentTerm()) { + if (replicatedLogEntry == null) { + LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size()); + break; + } + + // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: + // "Raft never commits log entries from previous terms by counting replicas". + // However we keep looping so we can make progress when new entries in the current term + // reach consensus, as per §5.4.1: "once an entry from the current term is committed by + // counting replicas, then all prior entries are committed indirectly". + if (replicatedLogEntry.getTerm() == currentTerm()) { + LOG.trace("{}: Setting commit index to {}", logName(), N); context.setCommitIndex(N); + } else { + LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}", + logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); } } else { + LOG.trace("{}: minReplicationCount not reached - breaking", logName()); break; } } @@ -367,12 +428,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { - LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", + LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", logName(), followerId); return; } FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + if(followerLogInformation == null) { + // This can happen during AddServer if it times out. + LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", + logName(), followerId); + mapFollowerToSnapshot.remove(followerId); + return; + } + followerLogInformation.markFollowerActive(); if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { @@ -394,8 +463,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.remove(followerId); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", - logName(), followerId, followerLogInformation.getMatchIndex(), - followerLogInformation.getNextIndex()); + logName(), followerId, followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots @@ -403,7 +472,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { setSnapshot(null); } wasLastChunk = true; - + if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ + UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = + new UnInitializedFollowerSnapshotReply(followerId); + context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor()); + LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self"); + } } else { followerToSnapshot.markSendStatus(true); } @@ -464,7 +538,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { + protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { // Send an AppendEntries to all followers for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); @@ -539,7 +613,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntries = true; - initiateCaptureSnapshot(followerId, followerNextIndex); + if (canInstallSnapshot(followerNextIndex)) { + initiateCaptureSnapshot(followerId); + } } else if(sendHeartbeat) { // we send an AppendEntries, even if the follower is inactive @@ -583,39 +659,44 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply) * then send the existing snapshot in chunks to the follower. * @param followerId - * @param followerNextIndex */ - private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { - if (!context.getReplicatedLog().isPresent(followerNextIndex) && - context.getReplicatedLog().isInSnapshot(followerNextIndex)) { - - if (snapshot.isPresent()) { - // if a snapshot is present in the memory, most likely another install is in progress - // no need to capture snapshot. - // This could happen if another follower needs an install when one is going on. - final ActorSelection followerActor = context.getPeerActorSelection(followerId); - sendSnapshotChunk(followerActor, followerId); + public boolean initiateCaptureSnapshot(String followerId) { + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot. + // This could happen if another follower needs an install when one is going on. + final ActorSelection followerActor = context.getPeerActorSelection(followerId); + sendSnapshotChunk(followerActor, followerId); + return true; + } else { + return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); + } + } + private boolean canInstallSnapshot(long nextIndex){ + // If the follower's nextIndex is -1 then we might as well send it a snapshot + // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present + // in the snapshot + return (nextIndex == -1 || + (!context.getReplicatedLog().isPresent(nextIndex) + && context.getReplicatedLog().isInSnapshot(nextIndex))); - } else { - context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), - this.getReplicatedToAllIndex(), followerId); - } - } } private void sendInstallSnapshot() { LOG.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { - ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); + String followerId = e.getKey(); + ActorSelection followerActor = context.getPeerActorSelection(followerId); + FollowerLogInformation followerLogInfo = e.getValue(); if (followerActor != null) { - long nextIndex = e.getValue().getNextIndex(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { - sendSnapshotChunk(followerActor, e.getKey()); + long nextIndex = followerLogInfo.getNextIndex(); + if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED || + canInstallSnapshot(nextIndex)) { + sendSnapshotChunk(followerActor, followerId); } } } @@ -717,7 +798,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } protected boolean isLeaderIsolated() { - int minPresent = minIsolatedLeaderPeerCount; + int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { if (followerLogInformation.isFollowerActive()) { --minPresent;