X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=a40e75f4c3b2e5629cf2904891a61c7819ee12c9;hp=31bf99c2dc2bc9f5564c256294c605f6af1bd482;hb=1b0f84c4957e464bad6f7cb7350a8171c3d1621b;hpb=a81d98f692b80c45bce3fe6a87e731abfb012a9f diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 31bf99c2dc..a40e75f4c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,7 +14,9 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -24,7 +26,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -33,8 +39,8 @@ import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -43,9 +49,11 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import scala.concurrent.duration.FiniteDuration; /** @@ -81,17 +89,30 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { */ private final Queue trackers = new LinkedList<>(); + /** + * Map of serialized AppendEntries output streams keyed by log index. This is used in conjunction with the + * appendEntriesMessageSlicer for slicing single ReplicatedLogEntry payloads that exceed the message size threshold. + * This Map allows the SharedFileBackedOutputStreams to be reused for multiple followers. + */ + private final Map sharedSerializedAppendEntriesStreams = new HashMap<>(); + private final MessageSlicer appendEntriesMessageSlicer; + private Cancellable heartbeatSchedule = null; - private Optional snapshot = Optional.absent(); + private Optional snapshotHolder = Optional.absent(); private int minReplicationCount; - protected AbstractLeader(RaftActorContext context, RaftState state, - @Nullable AbstractLeader initializeFromLeader) { + protected AbstractLeader(final RaftActorContext context, final RaftState state, + @Nullable final AbstractLeader initializeFromLeader) { super(context, state); + appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName()) + .messageSliceSize(context.getConfigParams().getSnapshotChunkSize()) + .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3, + TimeUnit.MILLISECONDS).build(); + if (initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); - snapshot = initializeFromLeader.snapshot; + snapshotHolder = initializeFromLeader.snapshotHolder; trackers.addAll(initializeFromLeader.trackers); } else { for (PeerInfo peerInfo: context.getPeers()) { @@ -114,7 +135,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } - protected AbstractLeader(RaftActorContext context, RaftState state) { + protected AbstractLeader(final RaftActorContext context, final RaftState state) { this(context, state, null); } @@ -127,7 +148,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } - public void addFollower(String followerId) { + public void addFollower(final String followerId) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl( context.getPeerInfo(followerId), -1, context); followerToLog.put(followerId, followerLogInformation); @@ -137,7 +158,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - public void removeFollower(String followerId) { + public void removeFollower(final String followerId) { followerToLog.remove(followerId); } @@ -164,22 +185,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - void setSnapshot(@Nullable Snapshot snapshot) { - if (snapshot != null) { - this.snapshot = Optional.of(new SnapshotHolder(snapshot)); - } else { - this.snapshot = Optional.absent(); - } + void setSnapshot(@Nullable final SnapshotHolder snapshotHolder) { + this.snapshotHolder = Optional.fromNullable(snapshotHolder); } @VisibleForTesting boolean hasSnapshot() { - return snapshot.isPresent(); + return snapshotHolder.isPresent(); } @Override - protected RaftActorBehavior handleAppendEntries(ActorRef sender, - AppendEntries appendEntries) { + protected RaftActorBehavior handleAppendEntries(final ActorRef sender, + final AppendEntries appendEntries) { log.debug("{}: handleAppendEntries: {}", logName(), appendEntries); @@ -187,7 +204,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { + protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, + final AppendEntriesReply appendEntriesReply) { log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); // Update the FollowerLogInformation @@ -228,9 +246,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // snapshot. It's also possible that the follower's last log index is behind the leader's. // However in this case the log terms won't match and the logs will conflict - this is handled // elsewhere. - log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - " - + "forcing install snaphot", logName(), followerLogInformation.getId(), - appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex()); + log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} " + + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(), + appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(), + context.getReplicatedLog().getSnapshotIndex()); followerLogInformation.setMatchIndex(-1); followerLogInformation.setNextIndex(-1); @@ -252,7 +271,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.setNextIndex(followerLastLogIndex - 1); updated = true; - log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " + log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " + "leader's {} - set the follower's next index to {}", logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(), followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); @@ -260,7 +279,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } } else { - log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); + log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}", + logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex()); if (appendEntriesReply.isForceInstallSnapshot()) { // Reset the followers match and next index. This is to signal that this follower has nothing @@ -278,30 +298,60 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // follower's last log index. updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); + + log.info("{}: follower {} appears to be behind the leader from the last snapshot - " + + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId, + followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); } else { // The follower's log conflicts with leader's log so decrement follower's next index by 1 // in an attempt to find where the logs match. - followerLogInformation.decrNextIndex(); - updated = true; + if (followerLogInformation.decrNextIndex()) { + updated = true; - log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}", - logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog, - followerLogInformation.getNextIndex()); + log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}", + logName(), followerId, appendEntriesReply.getLogLastTerm(), + followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); + } } } - // Now figure out if this reply warrants a change in the commitIndex - // If there exists an N such that N > commitIndex, a majority - // of matchIndex[i] ≥ N, and log[N].term == currentTerm: - // set commitIndex = N (§5.3, §5.4). if (log.isTraceEnabled()) { log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); } + possiblyUpdateCommitIndex(); + + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + + return this; + } + + private void possiblyUpdateCommitIndex() { + // Figure out if we can update the the commitIndex as follows: + // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N, + // and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). for (long index = context.getCommitIndex() + 1; ; index++) { - int replicatedCount = 1; + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); + if (replicatedLogEntry == null) { + log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), index, context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().size()); + break; + } + + // Count our entry if it has been persisted. + int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1; + + if (replicatedCount == 0) { + // We don't commit and apply a log entry until we've gotten the ack from our local persistence, + // even though there *shouldn't* be any issue with updating the commit index if we get a consensus + // amongst the followers w/o the local persistence ack. + break; + } log.trace("{}: checking Nth index {}", logName(), index); for (FollowerLogInformation info : followerToLog.values()) { @@ -320,14 +370,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } if (replicatedCount >= minReplicationCount) { - ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); - if (replicatedLogEntry == null) { - log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", - logName(), index, context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().size()); - break; - } - // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: // "Raft never commits log entries from previous terms by counting replicas". // However we keep looping so we can make progress when new entries in the current term @@ -349,11 +391,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { - if (log.isDebugEnabled()) { - log.debug( - "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", - logName(), followerId, context.getCommitIndex(), context.getLastApplied()); - } + log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(), + context.getCommitIndex(), context.getLastApplied()); applyLogToStateMachine(context.getCommitIndex()); } @@ -361,15 +400,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } - - //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); - - return this; } - private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation, - AppendEntriesReply appendEntriesReply) { + private boolean updateFollowerLogInformation(final FollowerLogInformation followerLogInformation, + final AppendEntriesReply appendEntriesReply) { boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; @@ -396,7 +430,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @Override - protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { final Iterator it = trackers.iterator(); while (it.hasNext()) { final ClientRequestTracker t = it.next(); @@ -410,28 +444,45 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply) { + protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, + final RequestVoteReply requestVoteReply) { return this; } protected void beforeSendHeartbeat(){} @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); + if (appendEntriesMessageSlicer.handleMessage(message)) { + return this; + } + if (message instanceof RaftRPC) { RaftRPC rpc = (RaftRPC) message; // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", + log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); + // This is a special case. Normally when stepping down as leader we don't process and reply to the + // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a + // RequestVote, process the RequestVote before switching to Follower. This enables the requesting + // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower + // state and starting a new election and grabbing leadership back before the other candidate node can + // start a new election due to lack of responses. This case would only occur if there isn't a majority + // of other nodes available that can elect the requesting candidate. Since we're transferring + // leadership, we should make every effort to get the requesting node elected. + if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) { + log.debug("{}: Leadership transfer in progress - processing RequestVote", logName()); + super.handleMessage(sender, message); + } + return internalSwitchBehavior(RaftState.Follower); } } @@ -441,13 +492,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } else if (message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(((SendInstallSnapshot) message).getSnapshot()); + SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message; + setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes())); sendInstallSnapshot(); } else if (message instanceof Replicate) { replicate((Replicate) message); } else if (message instanceof InstallSnapshotReply) { handleInstallSnapshotReply((InstallSnapshotReply) message); + } else if (message instanceof CheckConsensusReached) { + possiblyUpdateCommitIndex(); } else { return super.handleMessage(sender, message); } @@ -455,7 +508,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return this; } - private void handleInstallSnapshotReply(InstallSnapshotReply reply) { + private void handleInstallSnapshotReply(final InstallSnapshotReply reply) { log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); String followerId = reply.getFollowerId(); @@ -481,18 +534,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (reply.isSuccess()) { if (installSnapshotState.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply - log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -" - + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId, - context.getReplicatedLog().getSnapshotIndex() + 1); - long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex(); followerLogInformation.setMatchIndex(followerMatchIndex); followerLogInformation.setNextIndex(followerMatchIndex + 1); followerLogInformation.clearLeaderInstallSnapshotState(); - log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", - logName(), followerId, followerLogInformation.getMatchIndex(), - followerLogInformation.getNextIndex()); + log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - " + + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(), + followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); if (!anyFollowersInstallingSnapshot()) { // once there are no pending followers receiving snapshots @@ -511,16 +561,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { installSnapshotState.markSendStatus(true); } } else { - log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", - logName(), reply.getChunkIndex()); + log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply); installSnapshotState.markSendStatus(false); } - if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { - // Since the follower is now caught up try to purge the log. - purgeInMemoryLog(); - } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) { + if (wasLastChunk) { + if (!context.getSnapshotManager().isCapturing()) { + // Since the follower is now caught up try to purge the log. + purgeInMemoryLog(); + } + } else { ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { sendSnapshotChunk(followerActor, followerLogInformation); @@ -551,11 +602,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return false; } - private void replicate(Replicate replicate) { + private void replicate(final Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), - replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); + log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(), + replicate.isSendImmediate()); // Create a tracker entry we will use this later to notify the // client actor @@ -572,12 +624,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(logIndex); } - if (!followerToLog.isEmpty()) { + if (replicate.isSendImmediate() && !followerToLog.isEmpty()) { sendAppendEntries(0, false); } } - protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { + protected void sendAppendEntries(final long timeSinceLastActivityInterval, final boolean isHeartbeat) { // Send an AppendEntries to all followers for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); @@ -593,11 +645,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { /** * This method checks if any update needs to be sent to the given follower. This includes append log entries, * sending next snapshot chunk, and initiating a snapshot. - * - * @return true if any update is sent, false otherwise */ - private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, - boolean sendHeartbeat, boolean isHeartbeat) { + private void sendUpdatesToFollower(final String followerId, final FollowerLogInformation followerLogInformation, + final boolean sendHeartbeat, final boolean isHeartbeat) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { @@ -615,6 +665,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntries = true; } + } else if (followerLogInformation.isLogEntrySlicingInProgress()) { + sendAppendEntries = sendHeartbeat; } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); @@ -631,11 +683,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerNextIndex, followerId); if (followerLogInformation.okToReplicate()) { - // Try to send all the entries in the journal but not exceeding the max data size - // for a single AppendEntries message. - int maxEntries = (int) context.getReplicatedLog().size(); - entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries, - context.getConfigParams().getSnapshotChunkSize()); + entries = getEntriesToSend(followerLogInformation, followerActor); sendAppendEntries = true; } } else if (isFollowerActive && followerNextIndex >= 0 @@ -644,17 +692,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent - if (log.isDebugEnabled()) { - log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " - + "follower-nextIndex: %d, leader-snapshot-index: %d, " - + "leader-last-index: %d", logName(), followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); - } - // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntries = true; if (canInstallSnapshot(followerNextIndex)) { + log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader " + + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex, + context.getReplicatedLog().size()); + initiateCaptureSnapshot(followerId); + } else { + // It doesn't seem like we should ever reach here - most likely indicates sonething is + // wrong. + log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, " + + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), + followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex, + context.getReplicatedLog().size()); } } else if (sendHeartbeat) { @@ -671,6 +724,76 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } + private List getEntriesToSend(FollowerLogInformation followerLogInfo, + ActorSelection followerActor) { + // Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries + // message. + int maxEntries = (int) context.getReplicatedLog().size(); + final int maxDataSize = context.getConfigParams().getSnapshotChunkSize(); + final long followerNextIndex = followerLogInfo.getNextIndex(); + List entries = context.getReplicatedLog().getFrom(followerNextIndex, + maxEntries, maxDataSize); + + // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If + // that is the case, then we need to slice it into smaller chunks. + if (!(entries.size() == 1 && entries.get(0).getData().size() > maxDataSize)) { + // Don't need to slice. + return entries; + } + + log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), entries.get(0).getData().size(), + maxDataSize); + + // If an AppendEntries has already been serialized for the log index then reuse the + // SharedFileBackedOutputStream. + final Long logIndex = entries.get(0).getIndex(); + SharedFileBackedOutputStream fileBackedStream = sharedSerializedAppendEntriesStreams.get(logIndex); + if (fileBackedStream == null) { + fileBackedStream = context.getFileBackedOutputStreamFactory().newSharedInstance(); + + final AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), + getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries, + context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion()); + + log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries, + followerLogInfo.getId()); + + try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) { + out.writeObject(appendEntries); + } catch (IOException e) { + log.error("{}: Error serializing {}", logName(), appendEntries, e); + fileBackedStream.cleanup(); + return Collections.emptyList(); + } + + sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream); + + fileBackedStream.setOnCleanupCallback(index -> { + log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index); + sharedSerializedAppendEntriesStreams.remove(index); + }, logIndex); + } else { + log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId()); + fileBackedStream.incrementUsageCount(); + } + + log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId()); + + // Record that slicing is in progress for the follower. + followerLogInfo.setSlicedLogEntryIndex(logIndex); + + final FollowerIdentifier identifier = new FollowerIdentifier(followerLogInfo.getId()); + appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier) + .fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor()) + .onFailureCallback(failure -> { + log.error("{}: Error slicing AppendEntries for follower {}", logName(), + followerLogInfo.getId(), failure); + followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX); + }).build()); + + return Collections.emptyList(); + } + private void sendAppendEntriesToFollower(ActorSelection followerActor, List entries, FollowerLogInformation followerLogInformation) { // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from @@ -680,9 +803,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // empty AppendEntries as a heart beat to prevent election. // - if we're in the process of installing a snapshot. In this case we don't send any new entries but still // need to send AppendEntries to prevent election. + // - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we + // need to send an empty AppendEntries to prevent election. boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null; - long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 : - context.getCommitIndex(); + long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress() + || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex(); long followerNextIndex = followerLogInformation.getNextIndex(); AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), @@ -717,9 +842,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * @param followerId the id of the follower. * @return true if capture was initiated, false otherwise. */ - public boolean initiateCaptureSnapshot(String followerId) { + public boolean initiateCaptureSnapshot(final String followerId) { FollowerLogInformation followerLogInfo = followerToLog.get(followerId); - if (snapshot.isPresent()) { + if (snapshotHolder.isPresent()) { // If a snapshot is present in the memory, most likely another install is in progress no need to capture // snapshot. This could happen if another follower needs an install when one is going on. final ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -727,19 +852,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState. sendSnapshotChunk(followerActor, followerLogInfo); return true; - } else { - boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), - this.getReplicatedToAllIndex(), followerId); - if (captureInitiated) { - followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( - context.getConfigParams().getSnapshotChunkSize(), logName())); - } + } - return captureInitiated; + boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); + if (captureInitiated) { + followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( + context.getConfigParams().getSnapshotChunkSize(), logName())); } + + return captureInitiated; } - private boolean canInstallSnapshot(long nextIndex) { + private boolean canInstallSnapshot(final long nextIndex) { // If the follower's nextIndex is -1 then we might as well send it a snapshot // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present // in the snapshot @@ -771,8 +896,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * Sends a snapshot chunk to a given follower * InstallSnapshot should qualify as a heartbeat too. */ - private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) { - if (snapshot.isPresent()) { + private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo) { + if (snapshotHolder.isPresent()) { LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState(); if (installSnapshotState == null) { installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(), @@ -780,35 +905,44 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState); } - // Ensure the snapshot bytes are set - this is a no-op. - installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes()); + try { + // Ensure the snapshot bytes are set - this is a no-op. + installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); - byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); + if (!installSnapshotState.canSendNextChunk()) { + return; + } - log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), - nextSnapshotChunk.length); + byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); - int nextChunkIndex = installSnapshotState.incrementChunkIndex(); - Optional serverConfig = Optional.absent(); - if (installSnapshotState.isLastChunk(nextChunkIndex)) { - serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); - } + log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), + nextSnapshotChunk.length); - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - snapshot.get().getLastIncludedIndex(), - snapshot.get().getLastIncludedTerm(), - nextSnapshotChunk, - nextChunkIndex, - installSnapshotState.getTotalChunks(), - Optional.of(installSnapshotState.getLastChunkHashCode()), - serverConfig - ).toSerializable(followerLogInfo.getRaftVersion()), - actor() - ); + int nextChunkIndex = installSnapshotState.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if (installSnapshotState.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } + + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + snapshotHolder.get().getLastIncludedIndex(), + snapshotHolder.get().getLastIncludedTerm(), + nextSnapshotChunk, + nextChunkIndex, + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), + serverConfig + ).toSerializable(followerLogInfo.getRaftVersion()), + actor() + ); + + } catch (IOException e) { + throw new RuntimeException(e); + } log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), - installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); + installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); } } @@ -816,6 +950,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (!followerToLog.isEmpty()) { log.trace("{}: Sending heartbeat", logName()); sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true); + + appendEntriesMessageSlicer.checkExpiredSlicedMessageState(); } } @@ -825,7 +961,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - private void scheduleHeartBeat(FiniteDuration interval) { + private void scheduleHeartBeat(final FiniteDuration interval) { if (followerToLog.isEmpty()) { // Optimization - do not bother scheduling a heartbeat as there are // no followers @@ -847,6 +983,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { @Override public void close() { stopHeartBeat(); + appendEntriesMessageSlicer.close(); } @Override @@ -891,7 +1028,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - public FollowerLogInformation getFollower(String followerId) { + public FollowerLogInformation getFollower(final String followerId) { return followerToLog.get(followerId); } @@ -900,15 +1037,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.size(); } - private static class SnapshotHolder { + static class SnapshotHolder { private final long lastIncludedTerm; private final long lastIncludedIndex; - private final ByteString snapshotBytes; + private final ByteSource snapshotBytes; - SnapshotHolder(Snapshot snapshot) { + SnapshotHolder(final Snapshot snapshot, final ByteSource snapshotBytes) { this.lastIncludedTerm = snapshot.getLastAppliedTerm(); this.lastIncludedIndex = snapshot.getLastAppliedIndex(); - this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + this.snapshotBytes = snapshotBytes; } long getLastIncludedTerm() { @@ -919,7 +1056,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return lastIncludedIndex; } - ByteString getSnapshotBytes() { + ByteSource getSnapshotBytes() { return snapshotBytes; } }