* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
-import com.google.protobuf.ByteString;
+import com.google.common.io.ByteSource;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Queue;
import java.util.concurrent.TimeUnit;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import scala.concurrent.duration.FiniteDuration;
/**
- * The behavior of a RaftActor when it is in the Leader state
- * <p/>
+ * The behavior of a RaftActor when it is in the Leader state.
+ *
+ * <p>
* Leaders:
* <ul>
* <li> Upon election: send initial empty AppendEntries RPCs
* respond after entry applied to state machine (§5.3)
* <li> If last log index ≥ nextIndex for a follower: send
* AppendEntries RPC with log entries starting at nextIndex
- * <ul>
* <li> If successful: update nextIndex and matchIndex for
* follower (§5.3)
* <li> If AppendEntries fails because of log inconsistency:
* decrement nextIndex and retry (§5.3)
- * </ul>
- * <li> If there exists an N such that N > commitIndex, a majority
+ * <li> If there exists an N such that N > commitIndex, a majority
* of matchIndex[i] ≥ N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
+ * </ul>
*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+ private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
- // The index of the first chunk that is sent when installing a snapshot
- public static final int FIRST_CHUNK_INDEX = 1;
-
- // The index that the follower should respond with if it needs the install snapshot to be reset
- public static final int INVALID_CHUNK_INDEX = -1;
-
- // This would be passed as the hash code of the last chunk when sending the first chunk
- public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+ /**
+ * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
+ * expect the entries to be modified in sequence, hence we open-code the lookup.
+ * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
+ * but we already expect those to be far from frequent.
+ */
+ private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
- private final Map<String, FollowerLogInformation> followerToLog;
- private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
+ /**
+ * Map of serialized AppendEntries output streams keyed by log index. This is used in conjunction with the
+ * appendEntriesMessageSlicer for slicing single ReplicatedLogEntry payloads that exceed the message size threshold.
+ * This Map allows the SharedFileBackedOutputStreams to be reused for multiple followers.
+ */
+ private final Map<Long, SharedFileBackedOutputStream> sharedSerializedAppendEntriesStreams = new HashMap<>();
+ private final MessageSlicer appendEntriesMessageSlicer;
private Cancellable heartbeatSchedule = null;
-
- private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
-
- protected final int minReplicationCount;
-
- protected final int minIsolatedLeaderPeerCount;
-
- private Optional<ByteString> snapshot;
-
- public AbstractLeader(RaftActorContext context) {
- super(context);
-
- final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
- for (String followerId : context.getPeerAddresses().keySet()) {
- FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerId,
- context.getCommitIndex(), -1,
- context.getConfigParams().getElectionTimeOutInterval());
-
- ftlBuilder.put(followerId, followerLogInformation);
+ private Optional<SnapshotHolder> snapshotHolder = Optional.empty();
+ private int minReplicationCount;
+
+ protected AbstractLeader(final RaftActorContext context, final RaftState state,
+ final @Nullable AbstractLeader initializeFromLeader) {
+ super(context, state);
+
+ appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
+ .messageSliceSize(context.getConfigParams().getSnapshotChunkSize())
+ .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3,
+ TimeUnit.MILLISECONDS).build();
+
+ if (initializeFromLeader != null) {
+ followerToLog.putAll(initializeFromLeader.followerToLog);
+ snapshotHolder = initializeFromLeader.snapshotHolder;
+ trackers.addAll(initializeFromLeader.trackers);
+ } else {
+ for (PeerInfo peerInfo: context.getPeers()) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context);
+ followerToLog.put(peerInfo.getId(), followerLogInformation);
+ }
}
- followerToLog = ftlBuilder.build();
-
- leaderId = context.getId();
- LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
+ log.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
- minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
-
- // the isolated Leader peer count will be 1 less than the majority vote count.
- // this is because the vote count has the self vote counted in it
- // for e.g
- // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
- // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
- // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
- minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
-
- snapshot = Optional.absent();
+ updateMinReplicaCount();
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
// prevent election timeouts (§5.2)
- scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ sendAppendEntries(0, false);
+
+ // It is important to schedule this heartbeat here
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+ }
+
+ protected AbstractLeader(final RaftActorContext context, final RaftState state) {
+ this(context, state, null);
}
/**
*
* @return Collection of follower IDs
*/
- protected final Collection<String> getFollowerIds() {
+ public final Collection<String> getFollowerIds() {
return followerToLog.keySet();
}
- private Optional<ByteString> getSnapshot() {
- return snapshot;
+ public void addFollower(final String followerId) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformation(context.getPeerInfo(followerId),
+ context);
+ followerToLog.put(followerId, followerLogInformation);
+
+ if (heartbeatSchedule == null) {
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+ }
+ }
+
+ public void removeFollower(final String followerId) {
+ followerToLog.remove(followerId);
+ }
+
+ public void updateMinReplicaCount() {
+ int numVoting = 0;
+ for (PeerInfo peer: context.getPeers()) {
+ if (peer.isVoting()) {
+ numVoting++;
+ }
+ }
+
+ minReplicationCount = getMajorityVoteCount(numVoting);
+ }
+
+ protected int getMinIsolatedLeaderPeerCount() {
+ //the isolated Leader peer count will be 1 less than the majority vote count.
+ //this is because the vote count has the self vote counted in it
+ //for e.g
+ //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
+ //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
+ //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
+
+ return minReplicationCount > 0 ? minReplicationCount - 1 : 0;
+ }
+
+ @VisibleForTesting
+ void setSnapshotHolder(final @Nullable SnapshotHolder snapshotHolder) {
+ this.snapshotHolder = Optional.ofNullable(snapshotHolder);
}
@VisibleForTesting
- void setSnapshot(Optional<ByteString> snapshot) {
- this.snapshot = snapshot;
+ boolean hasSnapshot() {
+ return snapshotHolder.isPresent();
}
@Override
- protected RaftActorBehavior handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries) {
+ protected RaftActorBehavior handleAppendEntries(final ActorRef sender,
+ final AppendEntries appendEntries) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
- }
+ log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
return this;
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply) {
-
- if(! appendEntriesReply.isSuccess()) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
- }
- }
+ protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
+ final AppendEntriesReply appendEntriesReply) {
+ log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
// Update the FollowerLogInformation
String followerId = appendEntriesReply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- if(followerLogInformation == null){
- LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
+ if (followerLogInformation == null) {
+ log.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
return this;
}
- followerLogInformation.markFollowerActive();
+ final long lastActivityNanos = followerLogInformation.nanosSinceLastActivity();
+ if (lastActivityNanos > context.getConfigParams().getElectionTimeOutInterval().toNanos()) {
+ log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
+ + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
+ logName(), appendEntriesReply, TimeUnit.NANOSECONDS.toMillis(lastActivityNanos),
+ context.getLastApplied(), context.getCommitIndex());
+ }
- if (appendEntriesReply.isSuccess()) {
- followerLogInformation
- .setMatchIndex(appendEntriesReply.getLogLastIndex());
- followerLogInformation
- .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+ followerLogInformation.markFollowerActive();
+ followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
+ followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+ followerLogInformation.setNeedsLeaderAddress(appendEntriesReply.isNeedsLeaderAddress());
+
+ long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
+ boolean updated = false;
+ if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
+ // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
+ // in raft as a node cannot become leader if it's log is behind another's. However, the
+ // non-voting semantics deviate a bit from raft. Only voting members participate in
+ // elections and can become leader so it's possible for a non-voting follower to be ahead
+ // of the leader. This can happen if persistence is disabled and all voting members are
+ // restarted. In this case, the voting leader will start out with an empty log however
+ // the non-voting followers still retain the previous data in memory. On the first
+ // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex
+ // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned
+ // lastLogIndex may be higher in which case we want to reset the follower by installing a
+ // snapshot. It's also possible that the follower's last log index is behind the leader's.
+ // However in this case the log terms won't match and the logs will conflict - this is handled
+ // elsewhere.
+ log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
+ + "(snapshotIndex {}, snapshotTerm {}) - forcing install snaphot", logName(),
+ followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
+ context.getReplicatedLog().lastIndex(), context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm());
+
+ followerLogInformation.setMatchIndex(-1);
+ followerLogInformation.setNextIndex(-1);
+
+ initiateCaptureSnapshot(followerId);
+
+ updated = true;
+ } else if (appendEntriesReply.isSuccess()) {
+ long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
+ if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0
+ && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
+ // The follower's last entry is present in the leader's journal but the terms don't match so the
+ // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means
+ // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot
+ // and no longer in the journal. Either way, we set the follower's next index to 1 less than the last
+ // index reported by the follower. For the former case, the leader will send all entries starting with
+ // the previous follower's index and the follower will remove and replace the conflicting entries as
+ // needed. For the latter, the leader will initiate an install snapshot.
+
+ followerLogInformation.setNextIndex(followerLastLogIndex - 1);
+ updated = true;
+
+ log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+ + "leader's {} - set the follower's next index to {}", logName(),
+ followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
+ followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+ } else {
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+ }
} else {
+ log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}, "
+ + "snapshotTerm: {}, replicatedToAllIndex: {}", logName(), appendEntriesReply,
+ context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(),
+ getReplicatedToAllIndex());
+
+ long followersLastLogTermInLeadersLogOrSnapshot = getLogEntryOrSnapshotTerm(followerLastLogIndex);
+ if (appendEntriesReply.isForceInstallSnapshot()) {
+ // Reset the followers match and next index. This is to signal that this follower has nothing
+ // in common with this Leader and so would require a snapshot to be installed
+ followerLogInformation.setMatchIndex(-1);
+ followerLogInformation.setNextIndex(-1);
+
+ // Force initiate a snapshot capture
+ initiateCaptureSnapshot(followerId);
+ } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLogOrSnapshot >= 0
+ && followersLastLogTermInLeadersLogOrSnapshot == appendEntriesReply.getLogLastTerm()) {
+ // The follower's log is empty or the follower's last entry is present in the leader's journal or
+ // snapshot and the terms match so the follower is just behind the leader's journal from the last
+ // snapshot, if any. We'll catch up the follower quickly by starting at the follower's last log index.
+
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+
+ log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
+ + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+ followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
+ } else {
+ // The follower's log conflicts with leader's log so decrement follower's next index
+ // in an attempt to find where the logs match.
+ if (followerLogInformation.decrNextIndex(appendEntriesReply.getLogLastIndex())) {
+ updated = true;
+
+ log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
+ logName(), followerId, appendEntriesReply.getLogLastTerm(),
+ followersLastLogTermInLeadersLogOrSnapshot, followerLogInformation.getNextIndex());
+ }
+ }
+ }
- // TODO: When we find that the follower is out of sync with the
- // Leader we simply decrement that followers next index by 1.
- // Would it be possible to do better than this? The RAFT spec
- // does not explicitly deal with it but may be something for us to
- // think about
-
- followerLogInformation.decrNextIndex();
+ if (log.isTraceEnabled()) {
+ log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
+ logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
}
- // Now figure out if this reply warrants a change in the commitIndex
- // If there exists an N such that N > commitIndex, a majority
- // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
- // set commitIndex = N (§5.3, §5.4).
- for (long N = context.getCommitIndex() + 1; ; N++) {
- int replicatedCount = 1;
+ possiblyUpdateCommitIndex();
+
+ //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+ sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
+ return this;
+ }
+
+ private void possiblyUpdateCommitIndex() {
+ // Figure out if we can update the the commitIndex as follows:
+ // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
+ // and log[N].term == currentTerm:
+ // set commitIndex = N (§5.3, §5.4).
+ for (long index = context.getCommitIndex() + 1; ; index++) {
+ ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
+ if (replicatedLogEntry == null) {
+ log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+ logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().size());
+ break;
+ }
+
+ // Count our entry if it has been persisted.
+ int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
+
+ if (replicatedCount == 0) {
+ // We don't commit and apply a log entry until we've gotten the ack from our local persistence,
+ // even though there *shouldn't* be any issue with updating the commit index if we get a consensus
+ // amongst the followers w/o the local persistence ack.
+ break;
+ }
+ log.trace("{}: checking Nth index {}", logName(), index);
for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex() >= N) {
+ final PeerInfo peerInfo = context.getPeerInfo(info.getId());
+ if (info.getMatchIndex() >= index && peerInfo != null && peerInfo.isVoting()) {
replicatedCount++;
+ } else if (log.isTraceEnabled()) {
+ log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+ info.getMatchIndex(), peerInfo);
}
}
+ if (log.isTraceEnabled()) {
+ log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount,
+ minReplicationCount);
+ }
+
if (replicatedCount >= minReplicationCount) {
- ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null &&
- replicatedLogEntry.getTerm() == currentTerm()) {
- context.setCommitIndex(N);
+ // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+ // "Raft never commits log entries from previous terms by counting replicas".
+ // However we keep looping so we can make progress when new entries in the current term
+ // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+ // counting replicas, then all prior entries are committed indirectly".
+ if (replicatedLogEntry.getTerm() == currentTerm()) {
+ log.trace("{}: Setting commit index to {}", logName(), index);
+ context.setCommitIndex(index);
+ } else {
+ log.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, "
+ + "term {} does not match the current term {}", logName(), index,
+ replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
}
} else {
+ log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
break;
}
}
// Apply the change to the state machine
if (context.getCommitIndex() > context.getLastApplied()) {
+ log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
+ context.getCommitIndex(), context.getLastApplied());
+
applyLogToStateMachine(context.getCommitIndex());
}
- return this;
+ if (!context.getSnapshotManager().isCapturing()) {
+ purgeInMemoryLog();
+ }
}
- @Override
- protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
- final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ private boolean updateFollowerLogInformation(final FollowerLogInformation followerLogInformation,
+ final AppendEntriesReply appendEntriesReply) {
+ boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
+ updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
+
+ if (updated && log.isDebugEnabled()) {
+ log.debug(
+ "{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
+ logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
+ }
+ return updated;
+ }
+
+ private void purgeInMemoryLog() {
+ //find the lowest index across followers which has been replicated to all.
+ // lastApplied if there are no followers, so that we keep clearing the log for single-node
+ // we would delete the in-mem log from that index on, in-order to minimize mem usage
+ // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
+ for (FollowerLogInformation info : followerToLog.values()) {
+ minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+ }
+
+ super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
+ }
+
+ /**
+ * Removes and returns the ClientRequestTracker for the specified log index.
+ * @param logIndex the log index
+ * @return the ClientRequestTracker or null if none available
+ */
+ private ClientRequestTracker removeClientRequestTracker(final long logIndex) {
+ final Iterator<ClientRequestTracker> it = trackers.iterator();
while (it.hasNext()) {
final ClientRequestTracker t = it.next();
if (t.getIndex() == logIndex) {
}
@Override
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- for (ClientRequestTracker tracker : trackerList) {
- if (tracker.getIndex() == logIndex) {
- return tracker;
- }
+ final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+ // first check whether a ClientRequestTracker exists for this entry.
+ // If it does that means the leader wasn't dropped before the transaction applied.
+ // That means that this transaction can be safely applied as a local transaction since we
+ // have the ClientRequestTracker.
+ final ClientRequestTracker tracker = removeClientRequestTracker(entry.getIndex());
+ if (tracker != null) {
+ return new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), entry);
}
- return null;
+
+ // Tracker is missing, this means that we switched behaviours between replicate and applystate
+ // and became the leader again,. We still want to apply this as a local modification because
+ // we have resumed leadership with that log entry having been committed.
+ final Payload payload = entry.getData();
+ if (payload instanceof IdentifiablePayload) {
+ return new ApplyState(null, ((IdentifiablePayload<?>) payload).getIdentifier(), entry);
+ }
+
+ return new ApplyState(null, null, entry);
}
@Override
- protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply) {
+ protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
return this;
}
- @Override
- public RaftState state() {
- return RaftState.Leader;
+ protected void beforeSendHeartbeat() {
+ // No-op
}
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
- Preconditions.checkNotNull(sender, "sender should not be null");
+ public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
+ requireNonNull(sender, "sender should not be null");
- Object message = fromSerializableMessage(originalMessage);
+ if (appendEntriesMessageSlicer.handleMessage(message)) {
+ return this;
+ }
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
+ log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return switchBehavior(new Follower(context));
+ // This is a special case. Normally when stepping down as leader we don't process and reply to the
+ // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
+ // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
+ // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
+ // state and starting a new election and grabbing leadership back before the other candidate node can
+ // start a new election due to lack of responses. This case would only occur if there isn't a majority
+ // of other nodes available that can elect the requesting candidate. Since we're transferring
+ // leadership, we should make every effort to get the requesting node elected.
+ if (rpc instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+ log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+ super.handleMessage(sender, rpc);
+ }
+
+ return internalSwitchBehavior(RaftState.Follower);
}
}
- try {
- if (message instanceof SendHeartBeat) {
- sendHeartBeat();
- return this;
-
- } else if(message instanceof InitiateInstallSnapshot) {
- installSnapshotIfNeeded();
-
- } else if(message instanceof SendInstallSnapshot) {
- // received from RaftActor
- setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
- sendInstallSnapshot();
-
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
-
- } else if (message instanceof InstallSnapshotReply){
- handleInstallSnapshotReply((InstallSnapshotReply) message);
-
- }
- } finally {
+ if (message instanceof SendHeartBeat) {
+ beforeSendHeartbeat();
+ sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+ } else if (message instanceof SendInstallSnapshot) {
+ SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+ setSnapshotHolder(new SnapshotHolder(sendInstallSnapshot.getSnapshot(),
+ sendInstallSnapshot.getSnapshotBytes()));
+ sendInstallSnapshot();
+ } else if (message instanceof Replicate) {
+ replicate((Replicate) message);
+ } else if (message instanceof InstallSnapshotReply) {
+ handleInstallSnapshotReply((InstallSnapshotReply) message);
+ } else if (message instanceof CheckConsensusReached) {
+ possiblyUpdateCommitIndex();
+ } else {
+ return super.handleMessage(sender, message);
}
- return super.handleMessage(sender, message);
+ return this;
}
- private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
+ @SuppressFBWarnings(value = "NP_NULL_PARAM_DEREF_ALL_TARGETS_DANGEROUS",
+ justification = "JDT nullness with SpotBugs at setSnapshotHolder(null)")
+ private void handleInstallSnapshotReply(final InstallSnapshotReply reply) {
+ log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
+
String followerId = reply.getFollowerId();
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- followerLogInformation.markFollowerActive();
+ if (followerLogInformation == null) {
+ // This can happen during AddServer if it times out.
+ log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
+ logName(), followerId);
+ return;
+ }
- if (followerToSnapshot != null &&
- followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
+ if (installSnapshotState == null) {
+ log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
+ logName(), followerId);
+ return;
+ }
+
+ installSnapshotState.resetChunkTimer();
+ followerLogInformation.markFollowerActive();
+ if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
+ boolean wasLastChunk = false;
if (reply.isSuccess()) {
- if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+ if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: InstallSnapshotReply received, " +
- "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- context.getId(), reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1
- );
- }
- followerLogInformation.setMatchIndex(
- context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation.setNextIndex(
- context.getReplicatedLog().getSnapshotIndex() + 1);
- mapFollowerToSnapshot.remove(followerId);
+ long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
+ followerLogInformation.setMatchIndex(followerMatchIndex);
+ followerLogInformation.setNextIndex(followerMatchIndex + 1);
+ followerLogInformation.clearLeaderInstallSnapshotState();
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
- context.getId(), followerToLog.get(followerId).getNextIndex());
- }
+ log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
+ + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
+ followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
- if (mapFollowerToSnapshot.isEmpty()) {
+ if (!anyFollowersInstallingSnapshot()) {
// once there are no pending followers receiving snapshots
// we can remove snapshot from the memory
- setSnapshot(Optional.<ByteString>absent());
+ setSnapshotHolder(null);
}
+ wasLastChunk = true;
+ if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+ UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
+ new UnInitializedFollowerSnapshotReply(followerId);
+ context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
+ log.debug("Sent message UnInitializedFollowerSnapshotReply to self");
+ }
} else {
- followerToSnapshot.markSendStatus(true);
+ installSnapshotState.markSendStatus(true);
}
} else {
- LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
- context.getId(), reply.getChunkIndex());
+ log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
- followerToSnapshot.markSendStatus(false);
+ installSnapshotState.markSendStatus(false);
+ }
+
+ if (wasLastChunk) {
+ if (!context.getSnapshotManager().isCapturing()) {
+ // Since the follower is now caught up try to purge the log.
+ purgeInMemoryLog();
+ }
+ } else {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if (followerActor != null) {
+ sendSnapshotChunk(followerActor, followerLogInformation);
+ }
}
} else {
- LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
- );
+ log.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+ logName(), reply.getChunkIndex(), followerId,
+ installSnapshotState.getChunkIndex());
- if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+ if (reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX) {
// Since the Follower did not find this index to be valid we should reset the follower snapshot
// so that Installing the snapshot can resume from the beginning
- followerToSnapshot.reset();
+ installSnapshotState.reset();
}
}
}
- private void replicate(Replicate replicate) {
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
+ private boolean anyFollowersInstallingSnapshot() {
+ for (FollowerLogInformation info: followerToLog.values()) {
+ if (info.getInstallSnapshotState() != null) {
+ return true;
+ }
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
}
+ return false;
+ }
+
+ private void replicate(final Replicate replicate) {
+ long logIndex = replicate.getReplicatedLogEntry().getIndex();
+
+ log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(),
+ replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(),
+ replicate.isSendImmediate());
+
// Create a tracker entry we will use this later to notify the
// client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
+ if (replicate.getClientActor() != null) {
+ trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
+ logIndex));
+ }
- if (followerToLog.isEmpty()) {
+ boolean applyModificationToState = !context.anyVotingPeers()
+ || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
+
+ if (applyModificationToState) {
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
- } else {
- sendAppendEntries();
+ }
+
+ if (replicate.isSendImmediate() && !followerToLog.isEmpty()) {
+ sendAppendEntries(0, false);
}
}
- private void sendAppendEntries() {
+ protected void sendAppendEntries(final long timeSinceLastActivityIntervalNanos, final boolean isHeartbeat) {
// Send an AppendEntries to all followers
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ final FollowerLogInformation followerLogInformation = e.getValue();
+ // This checks helps not to send a repeat message to the follower
+ if (!followerLogInformation.isFollowerActive()
+ || followerLogInformation.nanosSinceLastActivity() >= timeSinceLastActivityIntervalNanos) {
+ sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
+ }
+ }
+ }
- if (followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex();
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
-
- if (mapFollowerToSnapshot.get(followerId) != null) {
- // if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
- } else {
- // we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList());
+ /**
+ * This method checks if any update needs to be sent to the given follower. This includes append log entries,
+ * sending next snapshot chunk, and initiating a snapshot.
+ */
+ private void sendUpdatesToFollower(final String followerId, final FollowerLogInformation followerLogInformation,
+ final boolean sendHeartbeat, final boolean isHeartbeat) {
+
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if (followerActor != null) {
+ long followerNextIndex = followerLogInformation.getNextIndex();
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ boolean sendAppendEntries = false;
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+ LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
+ if (installSnapshotState != null) {
+
+ // if install snapshot is in process , then sent next chunk if possible
+ if (isFollowerActive) {
+ // 30 seconds with default settings, can be modified via heartbeat or election timeout factor
+ FiniteDuration snapshotReplyTimeout = context.getConfigParams().getHeartBeatInterval()
+ .$times(context.getConfigParams().getElectionTimeoutFactor() * 3);
+
+ if (installSnapshotState.isChunkTimedOut(snapshotReplyTimeout)) {
+ sendAppendEntries = !resendSnapshotChunk(followerActor, followerLogInformation);
+ } else if (installSnapshotState.canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerLogInformation);
}
+ } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) {
+ // we send a heartbeat even if we have not received a reply for the last chunk
+ sendAppendEntries = true;
+ }
+ } else if (followerLogInformation.isLogEntrySlicingInProgress()) {
+ sendAppendEntries = sendHeartbeat;
+ } else {
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- } else {
- long leaderLastIndex = context.getReplicatedLog().lastIndex();
- long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- final List<ReplicatedLogEntry> entries;
-
- if (isFollowerActive &&
- context.getReplicatedLog().isPresent(followerNextIndex)) {
- // FIXME : Sending one entry at a time
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- } else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex ) {
- // if the followers next index is not present in the leaders log, and
- // if the follower is just not starting and if leader's index is more than followers index
- // then snapshot should be sent
-
- if(LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
- "follower-nextIndex: %s, leader-snapshot-index: %s, " +
- "leader-last-index: %s", context.getId(), followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
- }
- actor().tell(new InitiateInstallSnapshot(), actor());
-
- // we would want to sent AE as the capture snapshot might take time
- entries = Collections.<ReplicatedLogEntry>emptyList();
+ if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) {
+ log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, "
+ + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive,
+ followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
+ }
+ if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+
+ log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
+ followerNextIndex, followerId);
+
+ if (followerLogInformation.okToReplicate(context.getCommitIndex())) {
+ entries = getEntriesToSend(followerLogInformation, followerActor);
+ sendAppendEntries = true;
+ }
+ } else if (isFollowerActive && followerNextIndex >= 0
+ && leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
+ // if the followers next index is not present in the leaders log, and
+ // if the follower is just not starting and if leader's index is more than followers index
+ // then snapshot should be sent
+
+ // Send heartbeat to follower whenever install snapshot is initiated.
+ sendAppendEntries = true;
+ if (canInstallSnapshot(followerNextIndex)) {
+ log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
+ + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
+
+ initiateCaptureSnapshot(followerId);
} else {
- //we send an AppendEntries, even if the follower is inactive
- // in-order to update the followers timestamp, in case it becomes active again
- entries = Collections.<ReplicatedLogEntry>emptyList();
+ // It doesn't seem like we should ever reach here - most likely indicates sonething is
+ // wrong.
+ log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
+ + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+ followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+ context.getReplicatedLog().size());
}
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
+ } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) {
+ // we send an AppendEntries, even if the follower is inactive
+ // in-order to update the followers timestamp, in case it becomes active again
+ sendAppendEntries = true;
}
+
+ }
+
+ if (sendAppendEntries) {
+ sendAppendEntriesToFollower(followerActor, entries, followerLogInformation);
}
}
}
- private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
- List<ReplicatedLogEntry> entries) {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
+ private List<ReplicatedLogEntry> getEntriesToSend(final FollowerLogInformation followerLogInfo,
+ final ActorSelection followerActor) {
+ // Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries
+ // message.
+ int maxEntries = (int) context.getReplicatedLog().size();
+ final int maxDataSize = context.getConfigParams().getSnapshotChunkSize();
+ final long followerNextIndex = followerLogInfo.getNextIndex();
+ List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex,
+ maxEntries, maxDataSize);
+
+ // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
+ // that is the case, then we need to slice it into smaller chunks.
+ if (!(entries.size() == 1 && entries.get(0).getData().size() > maxDataSize)) {
+ // Don't need to slice.
+ return entries;
+ }
+
+ log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), entries.get(0).getData().size(),
+ maxDataSize);
+
+ // If an AppendEntries has already been serialized for the log index then reuse the
+ // SharedFileBackedOutputStream.
+ final Long logIndex = entries.get(0).getIndex();
+ SharedFileBackedOutputStream fileBackedStream = sharedSerializedAppendEntriesStreams.get(logIndex);
+ if (fileBackedStream == null) {
+ fileBackedStream = context.getFileBackedOutputStreamFactory().newSharedInstance();
+
+ final AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+ getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries,
+ context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion());
+
+ log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries,
+ followerLogInfo.getId());
+
+ try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+ out.writeObject(appendEntries);
+ } catch (IOException e) {
+ log.error("{}: Error serializing {}", logName(), appendEntries, e);
+ fileBackedStream.cleanup();
+ return Collections.emptyList();
+ }
+
+ sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream);
+
+ fileBackedStream.setOnCleanupCallback(index -> {
+ log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index);
+ sharedSerializedAppendEntriesStreams.remove(index);
+ }, logIndex);
+ } else {
+ log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId());
+ fileBackedStream.incrementUsageCount();
+ }
+
+ log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId());
+
+ // Record that slicing is in progress for the follower.
+ followerLogInfo.setSlicedLogEntryIndex(logIndex);
+
+ final FollowerIdentifier identifier = new FollowerIdentifier(followerLogInfo.getId());
+ appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier)
+ .fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor())
+ .onFailureCallback(failure -> {
+ log.error("{}: Error slicing AppendEntries for follower {}", logName(),
+ followerLogInfo.getId(), failure);
+ followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX);
+ }).build());
+
+ return Collections.emptyList();
+ }
+
+ private void sendAppendEntriesToFollower(final ActorSelection followerActor, final List<ReplicatedLogEntry> entries,
+ final FollowerLogInformation followerLogInformation) {
+ // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
+ // possibly committing and applying conflicting entries (those with same index, different term) from a prior
+ // term that weren't replicated to a majority, which would be a violation of raft.
+ // - if the follower isn't active. In this case we don't know the state of the follower and we send an
+ // empty AppendEntries as a heart beat to prevent election.
+ // - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
+ // need to send AppendEntries to prevent election.
+ // - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we
+ // need to send an empty AppendEntries to prevent election.
+ boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
+ long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress()
+ || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex();
+
+ long followerNextIndex = followerLogInformation.getNextIndex();
+ AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+ getLogEntryIndex(followerNextIndex - 1),
+ getLogEntryTerm(followerNextIndex - 1), entries,
+ leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion(),
+ followerLogInformation.getRaftVersion(), followerLogInformation.needsLeaderAddress(getId()));
+
+ if (!entries.isEmpty() || log.isTraceEnabled()) {
+ log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
+ appendEntries);
+ }
+
+ followerLogInformation.setSentCommitIndex(leaderCommitIndex);
+ followerActor.tell(appendEntries, actor());
}
/**
- * An installSnapshot is scheduled at a interval that is a multiple of
- * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
- * snapshots at every heartbeat.
+ * Initiates a snapshot capture to install on a follower.
*
+ * <p>
* Install Snapshot works as follows
- * 1. Leader sends a InitiateInstallSnapshot message to self
- * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
- * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 5. On complete, Follower sends back a InstallSnapshotReply.
- * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
- * and replenishes the memory by deleting the snapshot in Replicated log.
+ * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
+ * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
+ * the Leader's handleMessage with a SendInstallSnapshot message.
+ * 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
+ * the Follower via InstallSnapshot messages.
+ * 4. For each chunk, the Follower sends back an InstallSnapshotReply.
+ * 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
+ * follower.
+ * 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
+ * then send the existing snapshot in chunks to the follower.
*
+ * @param followerId the id of the follower.
+ * @return true if capture was initiated, false otherwise.
*/
- private void installSnapshotIfNeeded() {
- for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
- final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
-
- if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot
- sendSnapshotChunk(followerActor, e.getKey());
-
- } else {
- initiateCaptureSnapshot();
- //we just need 1 follower who would need snapshot to be installed.
- // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
- // who needs an install and send to all who need
- break;
- }
+ public boolean initiateCaptureSnapshot(final String followerId) {
+ FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
+ if (snapshotHolder.isPresent()) {
+ // If a snapshot is present in the memory, most likely another install is in progress no need to capture
+ // snapshot. This could happen if another follower needs an install when one is going on.
+ final ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+ // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
+ sendSnapshotChunk(followerActor, followerLogInfo);
+ return true;
+ }
- }
- }
+ boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
+ if (captureInitiated) {
+ followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+ context.getConfigParams().getSnapshotChunkSize(), logName()));
}
- }
- // on every install snapshot, we try to capture the snapshot.
- // Once a capture is going on, another one issued will get ignored by RaftActor.
- private void initiateCaptureSnapshot() {
- LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
+ return captureInitiated;
+ }
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
+ private boolean canInstallSnapshot(final long nextIndex) {
+ // If the follower's nextIndex is -1 then we might as well send it a snapshot
+ // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
+ // in the snapshot
+ return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex);
- boolean isInstallSnapshotInitiated = true;
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
- lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
- actor());
}
private void sendInstallSnapshot() {
+ log.debug("{}: sendInstallSnapshot", logName());
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
- ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
+ String followerId = e.getKey();
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ FollowerLogInformation followerLogInfo = e.getValue();
if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, e.getKey());
+ long nextIndex = followerLogInfo.getNextIndex();
+ if (followerLogInfo.getInstallSnapshotState() != null
+ || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED
+ || canInstallSnapshot(nextIndex)) {
+ sendSnapshotChunk(followerActor, followerLogInfo);
}
}
}
* Sends a snapshot chunk to a given follower
* InstallSnapshot should qualify as a heartbeat too.
*/
- private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
- try {
- if (snapshot.isPresent()) {
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,snapshot.get()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks(),
- Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
- ).toSerializable(),
- actor()
- );
- LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
- context.getId(), followerActor.path(),
- mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo) {
+ if (snapshotHolder.isPresent()) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+ if (installSnapshotState == null) {
+ installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
+ logName());
+ followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
+ }
+
+ try {
+ // Ensure the snapshot bytes are set - this is a no-op.
+ installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
+
+ if (!installSnapshotState.canSendNextChunk()) {
+ return;
+ }
+
+ byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+
+ log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+ nextSnapshotChunk.length);
+
+ int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+ Optional<ServerConfigurationPayload> serverConfig = Optional.empty();
+ if (installSnapshotState.isLastChunk(nextChunkIndex)) {
+ serverConfig = Optional.ofNullable(context.getPeerServerInfo(true));
+ }
+
+ sendSnapshotChunk(followerActor, followerLogInfo, nextSnapshotChunk, nextChunkIndex, serverConfig);
+
+ log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+ installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+
+ } catch (IOException e) {
+ log.warn("{}: Unable to send chunk: {}/{}. Reseting snapshot progress. Snapshot state: {}", logName(),
+ installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks(),
+ installSnapshotState);
+ installSnapshotState.reset();
}
- } catch (IOException e) {
- LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
}
}
- /**
- * Acccepts snaphot as ByteString, enters into map for future chunks
- * creates and return a ByteString chunk
- */
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- if (followerToSnapshot == null) {
- followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
- mapFollowerToSnapshot.put(followerId, followerToSnapshot);
- }
- ByteString nextChunk = followerToSnapshot.getNextChunk();
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
+ private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo,
+ final byte[] snapshotChunk, final int chunkIndex,
+ final Optional<ServerConfigurationPayload> serverConfig) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+
+ installSnapshotState.startChunkTimer();
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ snapshotHolder.get().getLastIncludedIndex(),
+ snapshotHolder.get().getLastIncludedTerm(),
+ snapshotChunk,
+ chunkIndex,
+ installSnapshotState.getTotalChunks(),
+ OptionalInt.of(installSnapshotState.getLastChunkHashCode()),
+ serverConfig
+ ).toSerializable(followerLogInfo.getRaftVersion()),
+ actor()
+ );
+ }
+
+ private boolean resendSnapshotChunk(final ActorSelection followerActor,
+ final FollowerLogInformation followerLogInfo) {
+ if (!snapshotHolder.isPresent()) {
+ // Seems like we should never hit this case, but just in case we do, reset the snapshot progress so that it
+ // can restart from the next AppendEntries.
+ log.warn("{}: Attempting to resend snapshot with no snapshot holder present.", logName());
+ followerLogInfo.clearLeaderInstallSnapshotState();
+ return false;
}
- return nextChunk;
+
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+ // we are resending, timer needs to be reset
+ installSnapshotState.resetChunkTimer();
+ installSnapshotState.markSendStatus(false);
+
+ sendSnapshotChunk(followerActor, followerLogInfo);
+
+ return true;
}
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
- sendAppendEntries();
+ log.trace("{}: Sending heartbeat", logName());
+ sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toNanos(), true);
+
+ appendEntriesMessageSlicer.checkExpiredSlicedMessageState();
}
}
}
}
- private void scheduleHeartBeat(FiniteDuration interval) {
+ private void scheduleHeartBeat(final FiniteDuration interval) {
if (followerToLog.isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
// need to be sent if there are other messages being sent to the remote
// actor.
heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
- interval, context.getActor(), new SendHeartBeat(),
+ interval, context.getActor(), SendHeartBeat.INSTANCE,
context.getActorSystem().dispatcher(), context.getActor());
}
@Override
- public void close() throws Exception {
+ public void close() {
stopHeartBeat();
+ appendEntriesMessageSlicer.close();
}
@Override
- public String getLeaderId() {
+ public final String getLeaderId() {
return context.getId();
}
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return context.getPayloadVersion();
+ }
+
protected boolean isLeaderIsolated() {
- int minPresent = minIsolatedLeaderPeerCount;
+ int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
- if (followerLogInformation.isFollowerActive()) {
+ final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+ if (peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
--minPresent;
if (minPresent == 0) {
- break;
- }
- }
- }
- return (minPresent != 0);
- }
-
- /**
- * Encapsulates the snapshot bytestring and handles the logic of sending
- * snapshot chunks
- */
- protected class FollowerToSnapshot {
- private final ByteString snapshotBytes;
- private int offset = 0;
- // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
- private int replyReceivedForOffset;
- // if replyStatus is false, the previous chunk is attempted
- private boolean replyStatus = false;
- private int chunkIndex;
- private final int totalChunks;
- private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-
- public FollowerToSnapshot(ByteString snapshotBytes) {
- this.snapshotBytes = snapshotBytes;
- int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
- context.getId(), size, totalChunks);
- }
- replyReceivedForOffset = -1;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- }
-
- public ByteString getSnapshotBytes() {
- return snapshotBytes;
- }
-
- public int incrementOffset() {
- if(replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- offset = offset + context.getConfigParams().getSnapshotChunkSize();
- }
- return offset;
- }
-
- public int incrementChunkIndex() {
- if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- chunkIndex = chunkIndex + 1;
- }
- return chunkIndex;
- }
-
- public int getChunkIndex() {
- return chunkIndex;
- }
-
- public int getTotalChunks() {
- return totalChunks;
- }
-
- public boolean canSendNextChunk() {
- // we only send a false if a chunk is sent but we have not received a reply yet
- return replyReceivedForOffset == offset;
- }
-
- public boolean isLastChunk(int chunkIndex) {
- return totalChunks == chunkIndex;
- }
-
- public void markSendStatus(boolean success) {
- if (success) {
- // if the chunk sent was successful
- replyReceivedForOffset = offset;
- replyStatus = true;
- lastChunkHashCode = nextChunkHashCode;
- } else {
- // if the chunk sent was failure
- replyReceivedForOffset = offset;
- replyStatus = false;
- }
- }
-
- public ByteString getNextChunk() {
- int snapshotLength = getSnapshotBytes().size();
- int start = incrementOffset();
- int size = context.getConfigParams().getSnapshotChunkSize();
- if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
- size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
+ return false;
}
}
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
- snapshotLength, start, size);
- }
- ByteString substring = getSnapshotBytes().substring(start, start + size);
- nextChunkHashCode = substring.hashCode();
- return substring;
- }
-
- /**
- * reset should be called when the Follower needs to be sent the snapshot from the beginning
- */
- public void reset(){
- offset = 0;
- replyStatus = false;
- replyReceivedForOffset = offset;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- }
-
- public int getLastChunkHashCode() {
- return lastChunkHashCode;
}
+ return minPresent != 0;
}
// called from example-actor for printing the follower-states
}
@VisibleForTesting
- public FollowerLogInformation getFollower(String followerId) {
+ public FollowerLogInformation getFollower(final String followerId) {
return followerToLog.get(followerId);
}
@VisibleForTesting
- protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
- mapFollowerToSnapshot.put(followerId, snapshot);
+ public int followerLogSize() {
+ return followerToLog.size();
}
- @VisibleForTesting
- public int followerSnapshotSize() {
- return mapFollowerToSnapshot.size();
- }
+ static class SnapshotHolder {
+ private final long lastIncludedTerm;
+ private final long lastIncludedIndex;
+ private final ByteSource snapshotBytes;
- @VisibleForTesting
- public int followerLogSize() {
- return followerToLog.size();
+ SnapshotHolder(final Snapshot snapshot, final ByteSource snapshotBytes) {
+ this.lastIncludedTerm = snapshot.getLastAppliedTerm();
+ this.lastIncludedIndex = snapshot.getLastAppliedIndex();
+ this.snapshotBytes = snapshotBytes;
+ }
+
+ long getLastIncludedTerm() {
+ return lastIncludedTerm;
+ }
+
+ long getLastIncludedIndex() {
+ return lastIncludedIndex;
+ }
+
+ ByteSource getSnapshotBytes() {
+ return snapshotBytes;
+ }
}
}