X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=4382cfec3237075087837b5f1402cadab249188f;hb=fbeed3e25ef80495c67bfe1698daf44b88fb10ff;hp=180f492082ee05bde4477cd82add5a0c4d846f69;hpb=74524984b8e8625f6b8e8c791c584844d49ccf45;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 180f492082..4382cfec32 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,7 +14,9 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -33,8 +35,8 @@ import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -46,11 +48,13 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import scala.concurrent.duration.FiniteDuration; /** * The behavior of a RaftActor when it is in the Leader state - *

+ * + *

* Leaders: *

*/ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Map followerToLog = new HashMap<>(); @@ -76,32 +79,31 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { /** * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really * expect the entries to be modified in sequence, hence we open-code the lookup. - * * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly, * but we already expect those to be far from frequent. */ private final Queue trackers = new LinkedList<>(); private Cancellable heartbeatSchedule = null; - private Optional snapshot = Optional.absent();; + private Optional snapshotHolder = Optional.absent(); private int minReplicationCount; protected AbstractLeader(RaftActorContext context, RaftState state, @Nullable AbstractLeader initializeFromLeader) { super(context, state); - if(initializeFromLeader != null) { + if (initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); - snapshot = initializeFromLeader.snapshot; + snapshotHolder = initializeFromLeader.snapshotHolder; trackers.addAll(initializeFromLeader.trackers); } else { - for(PeerInfo peerInfo: context.getPeers()) { + for (PeerInfo peerInfo: context.getPeers()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); followerToLog.put(peerInfo.getId(), followerLogInformation); } } - LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); + log.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); updateMinReplicaCount(); @@ -133,7 +135,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getPeerInfo(followerId), -1, context); followerToLog.put(followerId, followerLogInformation); - if(heartbeatSchedule == null) { + if (heartbeatSchedule == null) { scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } } @@ -144,8 +146,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public void updateMinReplicaCount() { int numVoting = 0; - for(PeerInfo peer: context.getPeers()) { - if(peer.isVoting()) { + for (PeerInfo peer: context.getPeers()) { + if (peer.isVoting()) { numVoting++; } } @@ -153,7 +155,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { minReplicationCount = getMajorityVoteCount(numVoting); } - protected int getMinIsolatedLeaderPeerCount(){ + protected int getMinIsolatedLeaderPeerCount() { //the isolated Leader peer count will be 1 less than the majority vote count. //this is because the vote count has the self vote counted in it //for e.g @@ -161,51 +163,45 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 - return minReplicationCount > 0 ? (minReplicationCount - 1) : 0; + return minReplicationCount > 0 ? minReplicationCount - 1 : 0; } @VisibleForTesting - void setSnapshot(@Nullable Snapshot snapshot) { - if(snapshot != null) { - this.snapshot = Optional.of(new SnapshotHolder(snapshot)); - } else { - this.snapshot = Optional.absent(); - } + void setSnapshot(@Nullable SnapshotHolder snapshotHolder) { + this.snapshotHolder = Optional.fromNullable(snapshotHolder); } @VisibleForTesting boolean hasSnapshot() { - return snapshot.isPresent(); + return snapshotHolder.isPresent(); } @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries); + log.debug("{}: handleAppendEntries: {}", logName(), appendEntries); return this; } @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { - if(LOG.isTraceEnabled()) { - LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); - } + log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); // Update the FollowerLogInformation String followerId = appendEntriesReply.getFollowerId(); FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - if(followerLogInformation == null){ - LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); + if (followerLogInformation == null) { + log.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); return this; } - if(followerLogInformation.timeSinceLastActivity() > - context.getConfigParams().getElectionTimeOutInterval().toMillis()) { - LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + - "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", + if (followerLogInformation.timeSinceLastActivity() + > context.getConfigParams().getElectionTimeOutInterval().toMillis()) { + log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(), context.getLastApplied(), context.getCommitIndex()); } @@ -217,7 +213,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex); boolean updated = false; - if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) { + if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) { // The follower's log is actually ahead of the leader's log. Normally this doesn't happen // in raft as a node cannot become leader if it's log is behind another's. However, the // non-voting semantics deviate a bit from raft. Only voting members participate in @@ -231,9 +227,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // snapshot. It's also possible that the follower's last log index is behind the leader's. // However in this case the log terms won't match and the logs will conflict - this is handled // elsewhere. - LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot", - logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(), - context.getReplicatedLog().lastIndex()); + log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} " + + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(), + appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(), + context.getReplicatedLog().getSnapshotIndex()); followerLogInformation.setMatchIndex(-1); followerLogInformation.setNextIndex(-1); @@ -242,8 +239,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { updated = true; } else if (appendEntriesReply.isSuccess()) { - if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 && - followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) { + if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 + && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) { // The follower's last entry is present in the leader's journal but the terms don't match so the // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot @@ -255,17 +252,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.setNextIndex(followerLastLogIndex - 1); updated = true; - LOG.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " + - "leader's {} - set the follower's next index to {}", - logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(), + log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " + + "leader's {} - set the follower's next index to {}", logName(), + followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(), followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); } else { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } } else { - LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); + log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}", + logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex()); - if(appendEntriesReply.isForceInstallSnapshot()) { + if (appendEntriesReply.isForceInstallSnapshot()) { // Reset the followers match and next index. This is to signal that this follower has nothing // in common with this Leader and so would require a snapshot to be installed followerLogInformation.setMatchIndex(-1); @@ -273,86 +271,110 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Force initiate a snapshot capture initiateCaptureSnapshot(followerId); - } else if(followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 && - followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) { + } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 + && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) { // The follower's log is empty or the last entry is present in the leader's journal // and the terms match so the follower is just behind the leader's journal from // the last snapshot, if any. We'll catch up the follower quickly by starting at the // follower's last log index. updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); + + log.info("{}: follower {} appears to be behind the leader from the last snapshot - " + + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId, + appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); } else { // The follower's log conflicts with leader's log so decrement follower's next index by 1 // in an attempt to find where the logs match. - followerLogInformation.decrNextIndex(); - updated = true; + if (followerLogInformation.decrNextIndex()) { + updated = true; - LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}", - logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog, - followerLogInformation.getNextIndex()); + log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}", + logName(), followerId, appendEntriesReply.getLogLastTerm(), + followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); + } } } - // Now figure out if this reply warrants a change in the commitIndex - // If there exists an N such that N > commitIndex, a majority - // of matchIndex[i] ≥ N, and log[N].term == currentTerm: - // set commitIndex = N (§5.3, §5.4). - if(LOG.isTraceEnabled()) { - LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", + if (log.isTraceEnabled()) { + log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); } - for (long N = context.getCommitIndex() + 1; ; N++) { - int replicatedCount = 1; + possiblyUpdateCommitIndex(); - LOG.trace("{}: checking Nth index {}", logName(), N); + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + + return this; + } + + private void possiblyUpdateCommitIndex() { + // Figure out if we can update the the commitIndex as follows: + // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N, + // and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). + for (long index = context.getCommitIndex() + 1; ; index++) { + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); + if (replicatedLogEntry == null) { + log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), index, context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().size()); + break; + } + + // Count our entry if it has been persisted. + int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1; + + if (replicatedCount == 0) { + // We don't commit and apply a log entry until we've gotten the ack from our local persistence, + // even though there *shouldn't* be any issue with updating the commit index if we get a consensus + // amongst the followers w/o the local persistence ack. + break; + } + + log.trace("{}: checking Nth index {}", logName(), index); for (FollowerLogInformation info : followerToLog.values()) { final PeerInfo peerInfo = context.getPeerInfo(info.getId()); - if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) { + if (info.getMatchIndex() >= index && peerInfo != null && peerInfo.isVoting()) { replicatedCount++; - } else if(LOG.isTraceEnabled()) { - LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), + } else if (log.isTraceEnabled()) { + log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), info.getMatchIndex(), peerInfo); } } - if(LOG.isTraceEnabled()) { - LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount); + if (log.isTraceEnabled()) { + log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, + minReplicationCount); } if (replicatedCount >= minReplicationCount) { - ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); - if (replicatedLogEntry == null) { - LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", - logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size()); - break; - } - // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: // "Raft never commits log entries from previous terms by counting replicas". // However we keep looping so we can make progress when new entries in the current term // reach consensus, as per §5.4.1: "once an entry from the current term is committed by // counting replicas, then all prior entries are committed indirectly". if (replicatedLogEntry.getTerm() == currentTerm()) { - LOG.trace("{}: Setting commit index to {}", logName(), N); - context.setCommitIndex(N); + log.trace("{}: Setting commit index to {}", logName(), index); + context.setCommitIndex(index); } else { - LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}", - logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); + log.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, " + + "term {} does not match the current term {}", logName(), index, + replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); } } else { - LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount); + log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount); break; } } // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", - logName(), followerId, context.getCommitIndex(), context.getLastApplied()); - } + log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(), + context.getCommitIndex(), context.getLastApplied()); applyLogToStateMachine(context.getCommitIndex()); } @@ -360,11 +382,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } - - //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); - - return this; } private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation, @@ -372,10 +389,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; - if(updated && LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", - logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(), - followerLogInformation.getNextIndex()); + if (updated && log.isDebugEnabled()) { + log.debug( + "{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", + logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); } return updated; } @@ -425,7 +443,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", + log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); @@ -438,14 +456,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { beforeSendHeartbeat(); sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); - } else if(message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(((SendInstallSnapshot) message).getSnapshot()); + } else if (message instanceof SendInstallSnapshot) { + SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message; + setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes())); sendInstallSnapshot(); } else if (message instanceof Replicate) { replicate((Replicate) message); } else if (message instanceof InstallSnapshotReply) { handleInstallSnapshotReply((InstallSnapshotReply) message); + } else if (message instanceof CheckConsensusReached) { + possiblyUpdateCommitIndex(); } else { return super.handleMessage(sender, message); } @@ -454,20 +474,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void handleInstallSnapshotReply(InstallSnapshotReply reply) { - LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); + log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); String followerId = reply.getFollowerId(); FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - if(followerLogInformation == null) { + if (followerLogInformation == null) { // This can happen during AddServer if it times out. - LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", + log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", logName(), followerId); return; } LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); if (installSnapshotState == null) { - LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", + log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", logName(), followerId); return; } @@ -477,24 +497,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) { boolean wasLastChunk = false; if (reply.isSuccess()) { - if(installSnapshotState.isLastChunk(reply.getChunkIndex())) { + if (installSnapshotState.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply - if(LOG.isDebugEnabled()) { - LOG.debug("{}: InstallSnapshotReply received, " + - "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}", - logName(), reply.getChunkIndex(), followerId, - context.getReplicatedLog().getSnapshotIndex() + 1 - ); - } - long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex(); followerLogInformation.setMatchIndex(followerMatchIndex); followerLogInformation.setNextIndex(followerMatchIndex + 1); followerLogInformation.clearLeaderInstallSnapshotState(); - LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", - logName(), followerId, followerLogInformation.getMatchIndex(), - followerLogInformation.getNextIndex()); + log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - " + + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(), + followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); if (!anyFollowersInstallingSnapshot()) { // once there are no pending followers receiving snapshots @@ -503,18 +516,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } wasLastChunk = true; - if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ + if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = new UnInitializedFollowerSnapshotReply(followerId); context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor()); - LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self"); + log.debug("Sent message UnInitializedFollowerSnapshotReply to self"); } } else { installSnapshotState.markSendStatus(true); } } else { - LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", - logName(), reply.getChunkIndex()); + log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply); installSnapshotState.markSendStatus(false); } @@ -524,17 +536,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { purgeInMemoryLog(); } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) { ActorSelection followerActor = context.getPeerActorSelection(followerId); - if(followerActor != null) { + if (followerActor != null) { sendSnapshotChunk(followerActor, followerLogInformation); } } } else { - LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", + log.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", logName(), reply.getChunkIndex(), followerId, installSnapshotState.getChunkIndex()); - if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){ + if (reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX) { // Since the Follower did not find this index to be valid we should reset the follower snapshot // so that Installing the snapshot can resume from the beginning installSnapshotState.reset(); @@ -543,8 +555,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private boolean anyFollowersInstallingSnapshot() { - for(FollowerLogInformation info: followerToLog.values()) { - if(info.getInstallSnapshotState() != null) { + for (FollowerLogInformation info: followerToLog.values()) { + if (info.getInstallSnapshotState() != null) { return true; } @@ -556,12 +568,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), - replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); + log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(), + replicate.isSendImmediate()); // Create a tracker entry we will use this later to notify the // client actor - if(replicate.getClientActor() != null) { + if (replicate.getClientActor() != null) { trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), logIndex)); } @@ -569,12 +582,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { boolean applyModificationToState = !context.anyVotingPeers() || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); - if(applyModificationToState){ + if (applyModificationToState) { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } - if (!followerToLog.isEmpty()) { + if (replicate.isSendImmediate() && !followerToLog.isEmpty()) { sendAppendEntries(0, false); } } @@ -585,20 +598,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final String followerId = e.getKey(); final FollowerLogInformation followerLogInformation = e.getValue(); // This checks helps not to send a repeat message to the follower - if(!followerLogInformation.isFollowerActive() || - followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { + if (!followerLogInformation.isFollowerActive() + || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat); } } } /** - * * This method checks if any update needs to be sent to the given follower. This includes append log entries, * sending next snapshot chunk, and initiating a snapshot. - * @return true if any update is sent, false otherwise */ - private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, boolean sendHeartbeat, boolean isHeartbeat) { @@ -614,7 +624,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // if install snapshot is in process , then sent next chunk if possible if (isFollowerActive && installSnapshotState.canSendNextChunk()) { sendSnapshotChunk(followerActor, followerLogInformation); - } else if(sendHeartbeat) { + } else if (sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntries = true; } @@ -622,17 +632,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) { - LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", - logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); + if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) { + log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, " + + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive, + followerNextIndex, leaderLastIndex, leaderSnapShotIndex); } if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { - LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), + log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), followerNextIndex, followerId); - if(followerLogInformation.okToReplicate()) { + if (followerLogInformation.okToReplicate()) { // Try to send all the entries in the journal but not exceeding the max data size // for a single AppendEntries message. int maxEntries = (int) context.getReplicatedLog().size(); @@ -640,26 +651,31 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getConfigParams().getSnapshotChunkSize()); sendAppendEntries = true; } - } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { + } else if (isFollowerActive && followerNextIndex >= 0 + && leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " + - "follower-nextIndex: %d, leader-snapshot-index: %d, " + - "leader-last-index: %d", logName(), followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); - } - // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntries = true; if (canInstallSnapshot(followerNextIndex)) { + log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader " + + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex, + context.getReplicatedLog().size()); + initiateCaptureSnapshot(followerId); + } else { + // It doesn't seem like we should ever reach here - most likely indicates sonething is + // wrong. + log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, " + + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), + followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex, + context.getReplicatedLog().size()); } - } else if(sendHeartbeat) { + } else if (sendHeartbeat) { // we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again sendAppendEntries = true; @@ -667,7 +683,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } - if(sendAppendEntries) { + if (sendAppendEntries) { sendAppendEntriesToFollower(followerActor, entries, followerLogInformation); } } @@ -692,8 +708,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { getLogEntryTerm(followerNextIndex - 1), entries, leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion()); - if(!entries.isEmpty() || LOG.isTraceEnabled()) { - LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(), + if (!entries.isEmpty() || log.isTraceEnabled()) { + log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(), appendEntries); } @@ -703,6 +719,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { /** * Initiates a snapshot capture to install on a follower. * + *

* Install Snapshot works as follows * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor. * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to @@ -720,7 +737,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { */ public boolean initiateCaptureSnapshot(String followerId) { FollowerLogInformation followerLogInfo = followerToLog.get(followerId); - if (snapshot.isPresent()) { + if (snapshotHolder.isPresent()) { // If a snapshot is present in the memory, most likely another install is in progress no need to capture // snapshot. This could happen if another follower needs an install when one is going on. final ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -728,31 +745,30 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState. sendSnapshotChunk(followerActor, followerLogInfo); return true; - } else { - boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), - this.getReplicatedToAllIndex(), followerId); - if(captureInitiated) { - followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( - context.getConfigParams().getSnapshotChunkSize(), logName())); - } + } - return captureInitiated; + boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); + if (captureInitiated) { + followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( + context.getConfigParams().getSnapshotChunkSize(), logName())); } + + return captureInitiated; } - private boolean canInstallSnapshot(long nextIndex){ + private boolean canInstallSnapshot(long nextIndex) { // If the follower's nextIndex is -1 then we might as well send it a snapshot // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present // in the snapshot - return nextIndex == -1 || - (!context.getReplicatedLog().isPresent(nextIndex) - && context.getReplicatedLog().isInSnapshot(nextIndex)); + return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex) + && context.getReplicatedLog().isInSnapshot(nextIndex); } private void sendInstallSnapshot() { - LOG.debug("{}: sendInstallSnapshot", logName()); + log.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { String followerId = e.getKey(); ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -760,9 +776,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (followerActor != null) { long nextIndex = followerLogInfo.getNextIndex(); - if (followerLogInfo.getInstallSnapshotState() != null || - context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED || - canInstallSnapshot(nextIndex)) { + if (followerLogInfo.getInstallSnapshotState() != null + || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED + || canInstallSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, followerLogInfo); } } @@ -774,7 +790,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * InstallSnapshot should qualify as a heartbeat too. */ private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) { - if (snapshot.isPresent()) { + if (snapshotHolder.isPresent()) { LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState(); if (installSnapshotState == null) { installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(), @@ -782,44 +798,45 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState); } - // Ensure the snapshot bytes are set - this is a no-op. - installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes()); + try { + // Ensure the snapshot bytes are set - this is a no-op. + installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); - byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); + byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); - LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), - nextSnapshotChunk.length); + log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), + nextSnapshotChunk.length); - int nextChunkIndex = installSnapshotState.incrementChunkIndex(); - Optional serverConfig = Optional.absent(); - if(installSnapshotState.isLastChunk(nextChunkIndex)) { - serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); - } + int nextChunkIndex = installSnapshotState.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if (installSnapshotState.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - snapshot.get().getLastIncludedIndex(), - snapshot.get().getLastIncludedTerm(), - nextSnapshotChunk, - nextChunkIndex, - installSnapshotState.getTotalChunks(), - Optional.of(installSnapshotState.getLastChunkHashCode()), - serverConfig - ).toSerializable(followerLogInfo.getRaftVersion()), - actor() - ); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - logName(), followerActor.path(), installSnapshotState.getChunkIndex(), - installSnapshotState.getTotalChunks()); + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + snapshotHolder.get().getLastIncludedIndex(), + snapshotHolder.get().getLastIncludedTerm(), + nextSnapshotChunk, + nextChunkIndex, + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), + serverConfig + ).toSerializable(followerLogInfo.getRaftVersion()), + actor() + ); + + log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), + installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); + } catch (IOException e) { + throw Throwables.propagate(e); } } } private void sendHeartBeat() { if (!followerToLog.isEmpty()) { - LOG.trace("{}: Sending heartbeat", logName()); + log.trace("{}: Sending heartbeat", logName()); sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true); } } @@ -868,7 +885,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId()); - if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { + if (peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { --minPresent; if (minPresent == 0) { return false; @@ -905,15 +922,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.size(); } - private static class SnapshotHolder { + static class SnapshotHolder { private final long lastIncludedTerm; private final long lastIncludedIndex; - private final ByteString snapshotBytes; + private final ByteSource snapshotBytes; - SnapshotHolder(Snapshot snapshot) { + SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) { this.lastIncludedTerm = snapshot.getLastAppliedTerm(); this.lastIncludedIndex = snapshot.getLastAppliedIndex(); - this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + this.snapshotBytes = snapshotBytes; } long getLastIncludedTerm() { @@ -924,7 +941,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return lastIncludedIndex; } - ByteString getSnapshotBytes() { + ByteSource getSnapshotBytes() { return snapshotBytes; } }