X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=a40e75f4c3b2e5629cf2904891a61c7819ee12c9;hp=2c433f90076b20cda5002b819bf08b4dd7211104;hb=1b0f84c4957e464bad6f7cb7350a8171c3d1621b;hpb=a681e6bec3bbd7b536302ee9e083ae04b7f5ebdd diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 2c433f9007..a40e75f4c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,10 +14,9 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; -import com.google.protobuf.ByteString; +import com.google.common.io.ByteSource; import java.io.IOException; +import java.io.ObjectOutputStream; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -26,13 +25,22 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; +import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -41,12 +49,17 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import scala.concurrent.duration.FiniteDuration; /** * The behavior of a RaftActor when it is in the Leader state - *

+ * + *

* Leaders: *

*/ public abstract class AbstractLeader extends AbstractRaftActorBehavior { + private final Map followerToLog = new HashMap<>(); - // The index of the first chunk that is sent when installing a snapshot - public static final int FIRST_CHUNK_INDEX = 1; - - // The index that the follower should respond with if it needs the install snapshot to be reset - public static final int INVALID_CHUNK_INDEX = -1; - - // This would be passed as the hash code of the last chunk when sending the first chunk - public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; + /** + * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really + * expect the entries to be modified in sequence, hence we open-code the lookup. + * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly, + * but we already expect those to be far from frequent. + */ + private final Queue trackers = new LinkedList<>(); - private final Map followerToLog; - private final Map mapFollowerToSnapshot = new HashMap<>(); + /** + * Map of serialized AppendEntries output streams keyed by log index. This is used in conjunction with the + * appendEntriesMessageSlicer for slicing single ReplicatedLogEntry payloads that exceed the message size threshold. + * This Map allows the SharedFileBackedOutputStreams to be reused for multiple followers. + */ + private final Map sharedSerializedAppendEntriesStreams = new HashMap<>(); + private final MessageSlicer appendEntriesMessageSlicer; private Cancellable heartbeatSchedule = null; - - private final Collection trackerList = new LinkedList<>(); - - protected final int minReplicationCount; - - protected final int minIsolatedLeaderPeerCount; - - private Optional snapshot; - - public AbstractLeader(RaftActorContext context) { - super(context, RaftState.Leader); - - final Builder ftlBuilder = ImmutableMap.builder(); - for (String followerId : context.getPeerAddresses().keySet()) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, -1, context); - - ftlBuilder.put(followerId, followerLogInformation); + private Optional snapshotHolder = Optional.absent(); + private int minReplicationCount; + + protected AbstractLeader(final RaftActorContext context, final RaftState state, + @Nullable final AbstractLeader initializeFromLeader) { + super(context, state); + + appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName()) + .messageSliceSize(context.getConfigParams().getSnapshotChunkSize()) + .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3, + TimeUnit.MILLISECONDS).build(); + + if (initializeFromLeader != null) { + followerToLog.putAll(initializeFromLeader.followerToLog); + snapshotHolder = initializeFromLeader.snapshotHolder; + trackers.addAll(initializeFromLeader.trackers); + } else { + for (PeerInfo peerInfo: context.getPeers()) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + followerToLog.put(peerInfo.getId(), followerLogInformation); + } } - followerToLog = ftlBuilder.build(); - - leaderId = context.getId(); - - LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); - minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); + log.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); - // the isolated Leader peer count will be 1 less than the majority vote count. - // this is because the vote count has the self vote counted in it - // for e.g - // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 - // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 - // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 - minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0; - - snapshot = Optional.absent(); + updateMinReplicaCount(); // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs @@ -128,6 +135,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + protected AbstractLeader(final RaftActorContext context, final RaftState state) { + this(context, state, null); + } + /** * Return an immutable collection of follower identifiers. * @@ -137,99 +148,251 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } + public void addFollower(final String followerId) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl( + context.getPeerInfo(followerId), -1, context); + followerToLog.put(followerId, followerLogInformation); + + if (heartbeatSchedule == null) { + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + } + } + + public void removeFollower(final String followerId) { + followerToLog.remove(followerId); + } + + public void updateMinReplicaCount() { + int numVoting = 0; + for (PeerInfo peer: context.getPeers()) { + if (peer.isVoting()) { + numVoting++; + } + } + + minReplicationCount = getMajorityVoteCount(numVoting); + } + + protected int getMinIsolatedLeaderPeerCount() { + //the isolated Leader peer count will be 1 less than the majority vote count. + //this is because the vote count has the self vote counted in it + //for e.g + //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 + //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 + //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 + + return minReplicationCount > 0 ? minReplicationCount - 1 : 0; + } + + @VisibleForTesting + void setSnapshot(@Nullable final SnapshotHolder snapshotHolder) { + this.snapshotHolder = Optional.fromNullable(snapshotHolder); + } + @VisibleForTesting - void setSnapshot(Optional snapshot) { - this.snapshot = snapshot; + boolean hasSnapshot() { + return snapshotHolder.isPresent(); } @Override - protected RaftActorBehavior handleAppendEntries(ActorRef sender, - AppendEntries appendEntries) { + protected RaftActorBehavior handleAppendEntries(final ActorRef sender, + final AppendEntries appendEntries) { - LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries); + log.debug("{}: handleAppendEntries: {}", logName(), appendEntries); return this; } @Override - protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply) { - - if(LOG.isTraceEnabled()) { - LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); - } + protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, + final AppendEntriesReply appendEntriesReply) { + log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); // Update the FollowerLogInformation String followerId = appendEntriesReply.getFollowerId(); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - if(followerLogInformation == null){ - LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); + if (followerLogInformation == null) { + log.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); return this; } - if(followerLogInformation.timeSinceLastActivity() > - context.getConfigParams().getElectionTimeOutInterval().toMillis()) { - LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + - "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", + if (followerLogInformation.timeSinceLastActivity() + > context.getConfigParams().getElectionTimeOutInterval().toMillis()) { + log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(), context.getLastApplied(), context.getCommitIndex()); } followerLogInformation.markFollowerActive(); + followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion()); + followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion()); + long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); + long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex); boolean updated = false; - if (appendEntriesReply.isSuccess()) { - updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); - updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; - - if(updated && LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(), - followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); + if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) { + // The follower's log is actually ahead of the leader's log. Normally this doesn't happen + // in raft as a node cannot become leader if it's log is behind another's. However, the + // non-voting semantics deviate a bit from raft. Only voting members participate in + // elections and can become leader so it's possible for a non-voting follower to be ahead + // of the leader. This can happen if persistence is disabled and all voting members are + // restarted. In this case, the voting leader will start out with an empty log however + // the non-voting followers still retain the previous data in memory. On the first + // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex + // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned + // lastLogIndex may be higher in which case we want to reset the follower by installing a + // snapshot. It's also possible that the follower's last log index is behind the leader's. + // However in this case the log terms won't match and the logs will conflict - this is handled + // elsewhere. + log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} " + + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(), + appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(), + context.getReplicatedLog().getSnapshotIndex()); + + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + initiateCaptureSnapshot(followerId); + + updated = true; + } else if (appendEntriesReply.isSuccess()) { + if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 + && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) { + // The follower's last entry is present in the leader's journal but the terms don't match so the + // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means + // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot + // and no longer in the journal. Either way, we set the follower's next index to 1 less than the last + // index reported by the follower. For the former case, the leader will send all entries starting with + // the previous follower's index and the follower will remove and replace the conflicting entries as + // needed. For the latter, the leader will initiate an install snapshot. + + followerLogInformation.setNextIndex(followerLastLogIndex - 1); + updated = true; + + log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " + + "leader's {} - set the follower's next index to {}", logName(), + followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(), + followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); + } else { + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } } else { - LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); + log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}", + logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex()); + + if (appendEntriesReply.isForceInstallSnapshot()) { + // Reset the followers match and next index. This is to signal that this follower has nothing + // in common with this Leader and so would require a snapshot to be installed + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + // Force initiate a snapshot capture + initiateCaptureSnapshot(followerId); + } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 + && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) { + // The follower's log is empty or the last entry is present in the leader's journal + // and the terms match so the follower is just behind the leader's journal from + // the last snapshot, if any. We'll catch up the follower quickly by starting at the + // follower's last log index. + + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); + + log.info("{}: follower {} appears to be behind the leader from the last snapshot - " + + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId, + followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); + } else { + // The follower's log conflicts with leader's log so decrement follower's next index by 1 + // in an attempt to find where the logs match. - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about + if (followerLogInformation.decrNextIndex()) { + updated = true; - followerLogInformation.decrNextIndex(); + log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}", + logName(), followerId, appendEntriesReply.getLogLastTerm(), + followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); + } + } + } + + if (log.isTraceEnabled()) { + log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); } - // Now figure out if this reply warrants a change in the commitIndex - // If there exists an N such that N > commitIndex, a majority - // of matchIndex[i] ≥ N, and log[N].term == currentTerm: - // set commitIndex = N (§5.3, §5.4). - for (long N = context.getCommitIndex() + 1; ; N++) { - int replicatedCount = 1; + possiblyUpdateCommitIndex(); + + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + return this; + } + + private void possiblyUpdateCommitIndex() { + // Figure out if we can update the the commitIndex as follows: + // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N, + // and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). + for (long index = context.getCommitIndex() + 1; ; index++) { + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); + if (replicatedLogEntry == null) { + log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), index, context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().size()); + break; + } + + // Count our entry if it has been persisted. + int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1; + + if (replicatedCount == 0) { + // We don't commit and apply a log entry until we've gotten the ack from our local persistence, + // even though there *shouldn't* be any issue with updating the commit index if we get a consensus + // amongst the followers w/o the local persistence ack. + break; + } + + log.trace("{}: checking Nth index {}", logName(), index); for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex() >= N) { + final PeerInfo peerInfo = context.getPeerInfo(info.getId()); + if (info.getMatchIndex() >= index && peerInfo != null && peerInfo.isVoting()) { replicatedCount++; + } else if (log.isTraceEnabled()) { + log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), + info.getMatchIndex(), peerInfo); } } + if (log.isTraceEnabled()) { + log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, + minReplicationCount); + } + if (replicatedCount >= minReplicationCount) { - ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); - if (replicatedLogEntry != null && - replicatedLogEntry.getTerm() == currentTerm()) { - context.setCommitIndex(N); + // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: + // "Raft never commits log entries from previous terms by counting replicas". + // However we keep looping so we can make progress when new entries in the current term + // reach consensus, as per §5.4.1: "once an entry from the current term is committed by + // counting replicas, then all prior entries are committed indirectly". + if (replicatedLogEntry.getTerm() == currentTerm()) { + log.trace("{}: Setting commit index to {}", logName(), index); + context.setCommitIndex(index); + } else { + log.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, " + + "term {} does not match the current term {}", logName(), index, + replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); } } else { + log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount); break; } } // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", - logName(), followerId, context.getCommitIndex(), context.getLastApplied()); - } + log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(), + context.getCommitIndex(), context.getLastApplied()); applyLogToStateMachine(context.getCommitIndex()); } @@ -237,10 +400,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } + } - //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); - return this; + private boolean updateFollowerLogInformation(final FollowerLogInformation followerLogInformation, + final AppendEntriesReply appendEntriesReply) { + boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); + updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; + + if (updated && log.isDebugEnabled()) { + log.debug( + "{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", + logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); + } + return updated; } private void purgeInMemoryLog() { @@ -257,8 +430,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @Override - protected ClientRequestTracker removeClientRequestTracker(long logIndex) { - final Iterator it = trackerList.iterator(); + protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { + final Iterator it = trackers.iterator(); while (it.hasNext()) { final ClientRequestTracker t = it.next(); if (t.getIndex() == logIndex) { @@ -271,28 +444,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @Override - protected ClientRequestTracker findClientRequestTracker(long logIndex) { - for (ClientRequestTracker tracker : trackerList) { - if (tracker.getIndex() == logIndex) { - return tracker; - } - } - return null; - } - - @Override - protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply) { + protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, + final RequestVoteReply requestVoteReply) { return this; } protected void beforeSendHeartbeat(){} @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { + public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); - Object message = fromSerializableMessage(originalMessage); + if (appendEntriesMessageSlicer.handleMessage(message)) { + return this; + } if (message instanceof RaftRPC) { RaftRPC rpc = (RaftRPC) message; @@ -300,12 +465,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", + log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return switchBehavior(new Follower(context)); + // This is a special case. Normally when stepping down as leader we don't process and reply to the + // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a + // RequestVote, process the RequestVote before switching to Follower. This enables the requesting + // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower + // state and starting a new election and grabbing leadership back before the other candidate node can + // start a new election due to lack of responses. This case would only occur if there isn't a majority + // of other nodes available that can elect the requesting candidate. Since we're transferring + // leadership, we should make every effort to get the requesting node elected. + if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) { + log.debug("{}: Leadership transfer in progress - processing RequestVote", logName()); + super.handleMessage(sender, message); + } + + return internalSwitchBehavior(RaftState.Follower); } } @@ -313,200 +491,226 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { beforeSendHeartbeat(); sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); - return this; - - } else if(message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + } else if (message instanceof SendInstallSnapshot) { + SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message; + setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes())); sendInstallSnapshot(); - } else if (message instanceof Replicate) { replicate((Replicate) message); - - } else if (message instanceof InstallSnapshotReply){ + } else if (message instanceof InstallSnapshotReply) { handleInstallSnapshotReply((InstallSnapshotReply) message); - + } else if (message instanceof CheckConsensusReached) { + possiblyUpdateCommitIndex(); + } else { + return super.handleMessage(sender, message); } - - return super.handleMessage(sender, message); + return this; } - private void handleInstallSnapshotReply(InstallSnapshotReply reply) { - LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); + private void handleInstallSnapshotReply(final InstallSnapshotReply reply) { + log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); String followerId = reply.getFollowerId(); - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + if (followerLogInformation == null) { + // This can happen during AddServer if it times out. + log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", + logName(), followerId); + return; + } - if (followerToSnapshot == null) { - LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", + LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); + if (installSnapshotState == null) { + log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", logName(), followerId); return; } - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); followerLogInformation.markFollowerActive(); - if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) { boolean wasLastChunk = false; if (reply.isSuccess()) { - if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { + if (installSnapshotState.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply - if(LOG.isDebugEnabled()) { - LOG.debug("{}: InstallSnapshotReply received, " + - "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}", - logName(), reply.getChunkIndex(), followerId, - context.getReplicatedLog().getSnapshotIndex() + 1 - ); - } - followerLogInformation.setMatchIndex( - context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation.setNextIndex( - context.getReplicatedLog().getSnapshotIndex() + 1); - mapFollowerToSnapshot.remove(followerId); + long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex(); + followerLogInformation.setMatchIndex(followerMatchIndex); + followerLogInformation.setNextIndex(followerMatchIndex + 1); + followerLogInformation.clearLeaderInstallSnapshotState(); - LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", - logName(), followerId, followerLogInformation.getMatchIndex(), - followerLogInformation.getNextIndex()); + log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - " + + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(), + followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); - if (mapFollowerToSnapshot.isEmpty()) { + if (!anyFollowersInstallingSnapshot()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory - setSnapshot(Optional.absent()); + setSnapshot(null); } - wasLastChunk = true; + wasLastChunk = true; + if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = + new UnInitializedFollowerSnapshotReply(followerId); + context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor()); + log.debug("Sent message UnInitializedFollowerSnapshotReply to self"); + } } else { - followerToSnapshot.markSendStatus(true); + installSnapshotState.markSendStatus(true); } } else { - LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", - logName(), reply.getChunkIndex()); + log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply); - followerToSnapshot.markSendStatus(false); + installSnapshotState.markSendStatus(false); } - if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { - // Since the follower is now caught up try to purge the log. - purgeInMemoryLog(); - } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + if (wasLastChunk) { + if (!context.getSnapshotManager().isCapturing()) { + // Since the follower is now caught up try to purge the log. + purgeInMemoryLog(); + } + } else { ActorSelection followerActor = context.getPeerActorSelection(followerId); - if(followerActor != null) { - sendSnapshotChunk(followerActor, followerId); + if (followerActor != null) { + sendSnapshotChunk(followerActor, followerLogInformation); } } } else { - LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", + log.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", logName(), reply.getChunkIndex(), followerId, - followerToSnapshot.getChunkIndex()); + installSnapshotState.getChunkIndex()); - if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ + if (reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX) { // Since the Follower did not find this index to be valid we should reset the follower snapshot // so that Installing the snapshot can resume from the beginning - followerToSnapshot.reset(); + installSnapshotState.reset(); + } + } + } + + private boolean anyFollowersInstallingSnapshot() { + for (FollowerLogInformation info: followerToLog.values()) { + if (info.getInstallSnapshotState() != null) { + return true; } + } + + return false; } - private void replicate(Replicate replicate) { + private void replicate(final Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(), - replicate.getIdentifier(), logIndex); + log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(), + replicate.isSendImmediate()); // Create a tracker entry we will use this later to notify the // client actor - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); + if (replicate.getClientActor() != null) { + trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), + logIndex)); + } - if (followerToLog.isEmpty()) { + boolean applyModificationToState = !context.anyVotingPeers() + || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); + + if (applyModificationToState) { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); - } else { + } + + if (replicate.isSendImmediate() && !followerToLog.isEmpty()) { sendAppendEntries(0, false); } } - private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { + protected void sendAppendEntries(final long timeSinceLastActivityInterval, final boolean isHeartbeat) { // Send an AppendEntries to all followers for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); final FollowerLogInformation followerLogInformation = e.getValue(); // This checks helps not to send a repeat message to the follower - if(!followerLogInformation.isFollowerActive() || - followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { + if (!followerLogInformation.isFollowerActive() + || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat); } } } /** - * * This method checks if any update needs to be sent to the given follower. This includes append log entries, * sending next snapshot chunk, and initiating a snapshot. - * @return true if any update is sent, false otherwise */ - - private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, - boolean sendHeartbeat, boolean isHeartbeat) { + private void sendUpdatesToFollower(final String followerId, final FollowerLogInformation followerLogInformation, + final boolean sendHeartbeat, final boolean isHeartbeat) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); boolean sendAppendEntries = false; - List entries = Collections.EMPTY_LIST; + List entries = Collections.emptyList(); - if (mapFollowerToSnapshot.get(followerId) != null) { + LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); + if (installSnapshotState != null) { // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { - sendSnapshotChunk(followerActor, followerId); - } else if(sendHeartbeat) { + if (isFollowerActive && installSnapshotState.canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerLogInformation); + } else if (sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntries = true; } + } else if (followerLogInformation.isLogEntrySlicingInProgress()) { + sendAppendEntries = sendHeartbeat; } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) { - LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", - logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); + if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) { + log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, " + + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive, + followerNextIndex, leaderLastIndex, leaderSnapShotIndex); } if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { - LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), + log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), followerNextIndex, followerId); - // FIXME : Sending one entry at a time - if(followerLogInformation.okToReplicate()) { - entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + if (followerLogInformation.okToReplicate()) { + entries = getEntriesToSend(followerLogInformation, followerActor); sendAppendEntries = true; } - } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { + } else if (isFollowerActive && followerNextIndex >= 0 + && leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + - "follower-nextIndex: %d, leader-snapshot-index: %d, " + - "leader-last-index: %d", logName(), followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); - } - // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntries = true; - initiateCaptureSnapshot(followerId, followerNextIndex); + if (canInstallSnapshot(followerNextIndex)) { + log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader " + + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex, + context.getReplicatedLog().size()); + + initiateCaptureSnapshot(followerId); + } else { + // It doesn't seem like we should ever reach here - most likely indicates sonething is + // wrong. + log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, " + + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), + followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex, + context.getReplicatedLog().size()); + } - } else if(sendHeartbeat) { + } else if (sendHeartbeat) { // we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again sendAppendEntries = true; @@ -514,73 +718,175 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } - if(sendAppendEntries) { - sendAppendEntriesToFollower(followerActor, followerNextIndex, - entries, followerId); + if (sendAppendEntries) { + sendAppendEntriesToFollower(followerActor, entries, followerLogInformation); + } + } + } + + private List getEntriesToSend(FollowerLogInformation followerLogInfo, + ActorSelection followerActor) { + // Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries + // message. + int maxEntries = (int) context.getReplicatedLog().size(); + final int maxDataSize = context.getConfigParams().getSnapshotChunkSize(); + final long followerNextIndex = followerLogInfo.getNextIndex(); + List entries = context.getReplicatedLog().getFrom(followerNextIndex, + maxEntries, maxDataSize); + + // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If + // that is the case, then we need to slice it into smaller chunks. + if (!(entries.size() == 1 && entries.get(0).getData().size() > maxDataSize)) { + // Don't need to slice. + return entries; + } + + log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), entries.get(0).getData().size(), + maxDataSize); + + // If an AppendEntries has already been serialized for the log index then reuse the + // SharedFileBackedOutputStream. + final Long logIndex = entries.get(0).getIndex(); + SharedFileBackedOutputStream fileBackedStream = sharedSerializedAppendEntriesStreams.get(logIndex); + if (fileBackedStream == null) { + fileBackedStream = context.getFileBackedOutputStreamFactory().newSharedInstance(); + + final AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), + getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries, + context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion()); + + log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries, + followerLogInfo.getId()); + + try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) { + out.writeObject(appendEntries); + } catch (IOException e) { + log.error("{}: Error serializing {}", logName(), appendEntries, e); + fileBackedStream.cleanup(); + return Collections.emptyList(); } + + sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream); + + fileBackedStream.setOnCleanupCallback(index -> { + log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index); + sharedSerializedAppendEntriesStreams.remove(index); + }, logIndex); + } else { + log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId()); + fileBackedStream.incrementUsageCount(); } + + log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId()); + + // Record that slicing is in progress for the follower. + followerLogInfo.setSlicedLogEntryIndex(logIndex); + + final FollowerIdentifier identifier = new FollowerIdentifier(followerLogInfo.getId()); + appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier) + .fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor()) + .onFailureCallback(failure -> { + log.error("{}: Error slicing AppendEntries for follower {}", logName(), + followerLogInfo.getId(), failure); + followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX); + }).build()); + + return Collections.emptyList(); } - private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, - List entries, String followerId) { + private void sendAppendEntriesToFollower(ActorSelection followerActor, List entries, + FollowerLogInformation followerLogInformation) { + // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from + // possibly committing and applying conflicting entries (those with same index, different term) from a prior + // term that weren't replicated to a majority, which would be a violation of raft. + // - if the follower isn't active. In this case we don't know the state of the follower and we send an + // empty AppendEntries as a heart beat to prevent election. + // - if we're in the process of installing a snapshot. In this case we don't send any new entries but still + // need to send AppendEntries to prevent election. + // - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we + // need to send an empty AppendEntries to prevent election. + boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null; + long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress() + || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex(); + + long followerNextIndex = followerLogInformation.getNextIndex(); AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), super.getReplicatedToAllIndex()); + getLogEntryIndex(followerNextIndex - 1), + getLogEntryTerm(followerNextIndex - 1), entries, + leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion()); - if(!entries.isEmpty() || LOG.isTraceEnabled()) { - LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId, + if (!entries.isEmpty() || log.isTraceEnabled()) { + log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(), appendEntries); } - followerActor.tell(appendEntries.toSerializable(), actor()); + followerActor.tell(appendEntries, actor()); } /** + * Initiates a snapshot capture to install on a follower. + * + *

* Install Snapshot works as follows - * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor - * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log - * and makes a call to Leader's handleMessage , with SendInstallSnapshot message. - * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower - * 4. On complete, Follower sends back a InstallSnapshotReply. - * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower - * and replenishes the memory by deleting the snapshot in Replicated log. - * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply) - * then send the existing snapshot in chunks to the follower. - * @param followerId - * @param followerNextIndex + * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor. + * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to + * the Leader's handleMessage with a SendInstallSnapshot message. + * 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to + * the Follower via InstallSnapshot messages. + * 4. For each chunk, the Follower sends back an InstallSnapshotReply. + * 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that + * follower. + * 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot) + * then send the existing snapshot in chunks to the follower. + * + * @param followerId the id of the follower. + * @return true if capture was initiated, false otherwise. */ - private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { - if (!context.getReplicatedLog().isPresent(followerNextIndex) && - context.getReplicatedLog().isInSnapshot(followerNextIndex)) { + public boolean initiateCaptureSnapshot(final String followerId) { + FollowerLogInformation followerLogInfo = followerToLog.get(followerId); + if (snapshotHolder.isPresent()) { + // If a snapshot is present in the memory, most likely another install is in progress no need to capture + // snapshot. This could happen if another follower needs an install when one is going on. + final ActorSelection followerActor = context.getPeerActorSelection(followerId); + + // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState. + sendSnapshotChunk(followerActor, followerLogInfo); + return true; + } - if (snapshot.isPresent()) { - // if a snapshot is present in the memory, most likely another install is in progress - // no need to capture snapshot. - // This could happen if another follower needs an install when one is going on. - final ActorSelection followerActor = context.getPeerActorSelection(followerId); - sendSnapshotChunk(followerActor, followerId); + boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); + if (captureInitiated) { + followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( + context.getConfigParams().getSnapshotChunkSize(), logName())); + } + return captureInitiated; + } + + private boolean canInstallSnapshot(final long nextIndex) { + // If the follower's nextIndex is -1 then we might as well send it a snapshot + // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present + // in the snapshot + return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex) + && context.getReplicatedLog().isInSnapshot(nextIndex); - } else { - context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), - this.getReplicatedToAllIndex(), followerId); - } - } } private void sendInstallSnapshot() { - LOG.debug("{}: sendInstallSnapshot", logName()); + log.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { - ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); + String followerId = e.getKey(); + ActorSelection followerActor = context.getPeerActorSelection(followerId); + FollowerLogInformation followerLogInfo = e.getValue(); if (followerActor != null) { - long nextIndex = e.getValue().getNextIndex(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { - sendSnapshotChunk(followerActor, e.getKey()); + long nextIndex = followerLogInfo.getNextIndex(); + if (followerLogInfo.getInstallSnapshotState() != null + || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED + || canInstallSnapshot(nextIndex)) { + sendSnapshotChunk(followerActor, followerLogInfo); } } } @@ -590,59 +896,62 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * Sends a snapshot chunk to a given follower * InstallSnapshot should qualify as a heartbeat too. */ - private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { - try { - if (snapshot.isPresent()) { - ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo) { + if (snapshotHolder.isPresent()) { + LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState(); + if (installSnapshotState == null) { + installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(), + logName()); + followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState); + } + + try { + // Ensure the snapshot bytes are set - this is a no-op. + installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); + + if (!installSnapshotState.canSendNextChunk()) { + return; + } - // Note: the previous call to getNextSnapshotChunk has the side-effect of adding - // followerId to the followerToSnapshot map. - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); + + log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), + nextSnapshotChunk.length); + + int nextChunkIndex = installSnapshotState.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if (installSnapshotState.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), + snapshotHolder.get().getLastIncludedIndex(), + snapshotHolder.get().getLastIncludedTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), - followerToSnapshot.getTotalChunks(), - Optional.of(followerToSnapshot.getLastChunkHashCode()) - ).toSerializable(), + nextChunkIndex, + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), + serverConfig + ).toSerializable(followerLogInfo.getRaftVersion()), actor() ); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), - followerToSnapshot.getTotalChunks()); - } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e); - } - } - /** - * Acccepts snaphot as ByteString, enters into map for future chunks - * creates and return a ByteString chunk - */ - private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); - if (followerToSnapshot == null) { - followerToSnapshot = new FollowerToSnapshot(snapshotBytes); - mapFollowerToSnapshot.put(followerId, followerToSnapshot); + log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), + installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); } - ByteString nextChunk = followerToSnapshot.getNextChunk(); - - LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size()); - - return nextChunk; } private void sendHeartBeat() { if (!followerToLog.isEmpty()) { - LOG.trace("{}: Sending heartbeat", logName()); + log.trace("{}: Sending heartbeat", logName()); sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true); + + appendEntriesMessageSlicer.checkExpiredSlicedMessageState(); } } @@ -652,7 +961,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - private void scheduleHeartBeat(FiniteDuration interval) { + private void scheduleHeartBeat(final FiniteDuration interval) { if (followerToLog.isEmpty()) { // Optimization - do not bother scheduling a heartbeat as there are // no followers @@ -667,147 +976,38 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // need to be sent if there are other messages being sent to the remote // actor. heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( - interval, context.getActor(), new SendHeartBeat(), + interval, context.getActor(), SendHeartBeat.INSTANCE, context.getActorSystem().dispatcher(), context.getActor()); } @Override - public void close() throws Exception { + public void close() { stopHeartBeat(); + appendEntriesMessageSlicer.close(); } @Override - public String getLeaderId() { + public final String getLeaderId() { return context.getId(); } + @Override + public final short getLeaderPayloadVersion() { + return context.getPayloadVersion(); + } + protected boolean isLeaderIsolated() { - int minPresent = minIsolatedLeaderPeerCount; + int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { - if (followerLogInformation.isFollowerActive()) { + final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId()); + if (peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { --minPresent; if (minPresent == 0) { - break; + return false; } } } - return (minPresent != 0); - } - - /** - * Encapsulates the snapshot bytestring and handles the logic of sending - * snapshot chunks - */ - protected class FollowerToSnapshot { - private final ByteString snapshotBytes; - private int offset = 0; - // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset - private int replyReceivedForOffset; - // if replyStatus is false, the previous chunk is attempted - private boolean replyStatus = false; - private int chunkIndex; - private final int totalChunks; - private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - - public FollowerToSnapshot(ByteString snapshotBytes) { - this.snapshotBytes = snapshotBytes; - int size = snapshotBytes.size(); - totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + - ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", - logName(), size, totalChunks); - } - replyReceivedForOffset = -1; - chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; - } - - public ByteString getSnapshotBytes() { - return snapshotBytes; - } - - public int incrementOffset() { - if(replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - offset = offset + context.getConfigParams().getSnapshotChunkSize(); - } - return offset; - } - - public int incrementChunkIndex() { - if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - chunkIndex = chunkIndex + 1; - } - return chunkIndex; - } - - public int getChunkIndex() { - return chunkIndex; - } - - public int getTotalChunks() { - return totalChunks; - } - - public boolean canSendNextChunk() { - // we only send a false if a chunk is sent but we have not received a reply yet - return replyReceivedForOffset == offset; - } - - public boolean isLastChunk(int chunkIndex) { - return totalChunks == chunkIndex; - } - - public void markSendStatus(boolean success) { - if (success) { - // if the chunk sent was successful - replyReceivedForOffset = offset; - replyStatus = true; - lastChunkHashCode = nextChunkHashCode; - } else { - // if the chunk sent was failure - replyReceivedForOffset = offset; - replyStatus = false; - } - } - - public ByteString getNextChunk() { - int snapshotLength = getSnapshotBytes().size(); - int start = incrementOffset(); - int size = context.getConfigParams().getSnapshotChunkSize(); - if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { - size = snapshotLength; - } else { - if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { - size = snapshotLength - start; - } - } - - - LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(), - snapshotLength, start, size); - - ByteString substring = getSnapshotBytes().substring(start, start + size); - nextChunkHashCode = substring.hashCode(); - return substring; - } - - /** - * reset should be called when the Follower needs to be sent the snapshot from the beginning - */ - public void reset(){ - offset = 0; - replyStatus = false; - replyReceivedForOffset = offset; - chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; - lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - } - - public int getLastChunkHashCode() { - return lastChunkHashCode; - } + return minPresent != 0; } // called from example-actor for printing the follower-states @@ -828,22 +1028,36 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - public FollowerLogInformation getFollower(String followerId) { + public FollowerLogInformation getFollower(final String followerId) { return followerToLog.get(followerId); } @VisibleForTesting - protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) { - mapFollowerToSnapshot.put(followerId, snapshot); + public int followerLogSize() { + return followerToLog.size(); } - @VisibleForTesting - public int followerSnapshotSize() { - return mapFollowerToSnapshot.size(); - } + static class SnapshotHolder { + private final long lastIncludedTerm; + private final long lastIncludedIndex; + private final ByteSource snapshotBytes; - @VisibleForTesting - public int followerLogSize() { - return followerToLog.size(); + SnapshotHolder(final Snapshot snapshot, final ByteSource snapshotBytes) { + this.lastIncludedTerm = snapshot.getLastAppliedTerm(); + this.lastIncludedIndex = snapshot.getLastAppliedIndex(); + this.snapshotBytes = snapshotBytes; + } + + long getLastIncludedTerm() { + return lastIncludedTerm; + } + + long getLastIncludedIndex() { + return lastIncludedIndex; + } + + ByteSource getSnapshotBytes() { + return snapshotBytes; + } } }