X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=be51ba069cc5056636646566d1db00b30154073a;hp=d85ac8ef67ded21d6140bb42512ba5fa6eb2165d;hb=a0b8be5ce48c0d1e0b573d1952211913c58d4935;hpb=992a433ff8fd0ce7335bf5ea9e59a75602a95a19 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index d85ac8ef67..b241e0a67a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -16,23 +16,27 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Map.Entry; +import java.util.Queue; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; +import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; +import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -42,6 +46,8 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import scala.concurrent.duration.FiniteDuration; /** @@ -67,77 +73,126 @@ import scala.concurrent.duration.FiniteDuration; * set commitIndex = N (§5.3, §5.4). */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { - protected final Map followerToLog = new HashMap<>(); - protected final Map mapFollowerToSnapshot = new HashMap<>(); - protected final Set followers; + // The index of the first chunk that is sent when installing a snapshot + public static final int FIRST_CHUNK_INDEX = 1; - private Cancellable heartbeatSchedule = null; + // The index that the follower should respond with if it needs the install snapshot to be reset + public static final int INVALID_CHUNK_INDEX = -1; + + // This would be passed as the hash code of the last chunk when sending the first chunk + public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; + + private final Map followerToLog = new HashMap<>(); + private final Map mapFollowerToSnapshot = new HashMap<>(); - private List trackerList = new ArrayList<>(); + /** + * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really + * expect the entries to be modified in sequence, hence we open-code the lookup. + * + * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly, + * but we already expect those to be far from frequent. + */ + private final Queue trackers = new LinkedList<>(); - protected final int minReplicationCount; + private Cancellable heartbeatSchedule = null; + private Optional snapshot = Optional.absent();; + private int minReplicationCount; + + protected AbstractLeader(RaftActorContext context, RaftState state, + @Nullable AbstractLeader initializeFromLeader) { + super(context, state); + + if(initializeFromLeader != null) { + followerToLog.putAll(initializeFromLeader.followerToLog); + mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot); + snapshot = initializeFromLeader.snapshot; + trackers.addAll(initializeFromLeader.trackers); + } else { + for(PeerInfo peerInfo: context.getPeers()) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + followerToLog.put(peerInfo.getId(), followerLogInformation); + } + } - protected final int minIsolatedLeaderPeerCount; + LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); - private Optional snapshot; + updateMinReplicaCount(); - public AbstractLeader(RaftActorContext context) { - super(context); + // Immediately schedule a heartbeat + // Upon election: send initial empty AppendEntries RPCs + // (heartbeat) to each server; repeat during idle periods to + // prevent election timeouts (§5.2) + sendAppendEntries(0, false); - followers = context.getPeerAddresses().keySet(); + // It is important to schedule this heartbeat here + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + } - for (String followerId : followers) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - new AtomicLong(context.getCommitIndex()), - new AtomicLong(-1), - context.getConfigParams().getElectionTimeOutInterval()); + protected AbstractLeader(RaftActorContext context, RaftState state) { + this(context, state, null); + } - followerToLog.put(followerId, followerLogInformation); - } + /** + * Return an immutable collection of follower identifiers. + * + * @return Collection of follower IDs + */ + public final Collection getFollowerIds() { + return followerToLog.keySet(); + } - leaderId = context.getId(); + public void addFollower(String followerId) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl( + context.getPeerInfo(followerId), -1, context); + followerToLog.put(followerId, followerLogInformation); - if(LOG.isDebugEnabled()) { - LOG.debug("Election:Leader has following peers: {}", followers); + if(heartbeatSchedule == null) { + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + } - minReplicationCount = getMajorityVoteCount(followers.size()); - - // the isolated Leader peer count will be 1 less than the majority vote count. - // this is because the vote count has the self vote counted in it - // for e.g - // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 - // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 - // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 - minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0; + public void removeFollower(String followerId) { + followerToLog.remove(followerId); + mapFollowerToSnapshot.remove(followerId); + } - snapshot = Optional.absent(); + public void updateMinReplicaCount() { + int numVoting = 0; + for(PeerInfo peer: context.getPeers()) { + if(peer.isVoting()) { + numVoting++; + } + } - // Immediately schedule a heartbeat - // Upon election: send initial empty AppendEntries RPCs - // (heartbeat) to each server; repeat during idle periods to - // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + minReplicationCount = getMajorityVoteCount(numVoting); } - private Optional getSnapshot() { - return snapshot; + protected int getMinIsolatedLeaderPeerCount(){ + //the isolated Leader peer count will be 1 less than the majority vote count. + //this is because the vote count has the self vote counted in it + //for e.g + //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 + //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 + //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 + + return minReplicationCount > 0 ? (minReplicationCount - 1) : 0; } @VisibleForTesting - void setSnapshot(Optional snapshot) { - this.snapshot = snapshot; + void setSnapshot(@Nullable Snapshot snapshot) { + if(snapshot != null) { + this.snapshot = Optional.of(new SnapshotHolder(snapshot)); + } else { + this.snapshot = Optional.absent(); + } } @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - if(LOG.isDebugEnabled()) { - LOG.debug(appendEntries.toString()); - } + LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries); return this; } @@ -146,10 +201,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { - if(! appendEntriesReply.isSuccess()) { - if(LOG.isDebugEnabled()) { - LOG.debug(appendEntriesReply.toString()); - } + if(LOG.isTraceEnabled()) { + LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); } // Update the FollowerLogInformation @@ -158,76 +211,190 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - LOG.error("Unknown follower {}", followerId); + LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); return this; } - followerLogInformation.markFollowerActive(); + if(followerLogInformation.timeSinceLastActivity() > + context.getConfigParams().getElectionTimeOutInterval().toMillis()) { + LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", + logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(), + context.getLastApplied(), context.getCommitIndex()); + } - if (appendEntriesReply.isSuccess()) { - followerLogInformation - .setMatchIndex(appendEntriesReply.getLogLastIndex()); - followerLogInformation - .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); + followerLogInformation.markFollowerActive(); + followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion()); + followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion()); + + boolean updated = false; + if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) { + // The follower's log is actually ahead of the leader's log. Normally this doesn't happen + // in raft as a node cannot become leader if it's log is behind another's. However, the + // non-voting semantics deviate a bit from raft. Only voting members participate in + // elections and can become leader so it's possible for a non-voting follower to be ahead + // of the leader. This can happen if persistence is disabled and all voting members are + // restarted. In this case, the voting leader will start out with an empty log however + // the non-voting followers still retain the previous data in memory. On the first + // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex + // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned + // lastLogIndex may be higher in which case we want to reset the follower by installing a + // snapshot. It's also possible that the follower's last log index is behind the leader's. + // However in this case the log terms won't match and the logs will conflict - this is handled + // elsewhere. + LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot", + logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(), + context.getReplicatedLog().lastIndex()); + + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + initiateCaptureSnapshot(followerId); + updated = true; + } else if (appendEntriesReply.isSuccess()) { + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { + LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); + + long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); + long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex); + if(appendEntriesReply.isForceInstallSnapshot()) { + // Reset the followers match and next index. This is to signal that this follower has nothing + // in common with this Leader and so would require a snapshot to be installed + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + // Force initiate a snapshot capture + initiateCaptureSnapshot(followerId); + } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 && + followersLastLogTerm == appendEntriesReply.getLogLastTerm())) { + // The follower's log is empty or the last entry is present in the leader's journal + // and the terms match so the follower is just behind the leader's journal from + // the last snapshot, if any. We'll catch up the follower quickly by starting at the + // follower's last log index. + + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); + } else { + // The follower's log conflicts with leader's log so decrement follower's next index by 1 + // in an attempt to find where the logs match. - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about + LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index", + logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm); - followerLogInformation.decrNextIndex(); + followerLogInformation.decrNextIndex(); + } } // Now figure out if this reply warrants a change in the commitIndex // If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N (§5.3, §5.4). + if(LOG.isTraceEnabled()) { + LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); + } + for (long N = context.getCommitIndex() + 1; ; N++) { int replicatedCount = 1; + LOG.trace("{}: checking Nth index {}", logName(), N); for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex().get() >= N) { + final PeerInfo peerInfo = context.getPeerInfo(info.getId()); + if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) { replicatedCount++; + } else if(LOG.isTraceEnabled()) { + LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), + info.getMatchIndex(), peerInfo); } } + if(LOG.isTraceEnabled()) { + LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount); + } + if (replicatedCount >= minReplicationCount) { ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); - if (replicatedLogEntry != null && - replicatedLogEntry.getTerm() == currentTerm()) { + if (replicatedLogEntry == null) { + LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size()); + break; + } + + // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: + // "Raft never commits log entries from previous terms by counting replicas". + // However we keep looping so we can make progress when new entries in the current term + // reach consensus, as per §5.4.1: "once an entry from the current term is committed by + // counting replicas, then all prior entries are committed indirectly". + if (replicatedLogEntry.getTerm() == currentTerm()) { + LOG.trace("{}: Setting commit index to {}", logName(), N); context.setCommitIndex(N); + } else { + LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}", + logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); } } else { + LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount); break; } } // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { + if(LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied()); + } + applyLogToStateMachine(context.getCommitIndex()); } + if (!context.getSnapshotManager().isCapturing()) { + purgeInMemoryLog(); + } + + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + return this; } - protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation, + AppendEntriesReply appendEntriesReply) { + boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); + updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; + + if(updated && LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", + logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); + } + return updated; + } - ClientRequestTracker toRemove = findClientRequestTracker(logIndex); - if(toRemove != null) { - trackerList.remove(toRemove); + private void purgeInMemoryLog() { + //find the lowest index across followers which has been replicated to all. + // lastApplied if there are no followers, so that we keep clearing the log for single-node + // we would delete the in-mem log from that index on, in-order to minimize mem usage + // we would also share this info thru AE with the followers so that they can delete their log entries as well. + long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE; + for (FollowerLogInformation info : followerToLog.values()) { + minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex()); } - return toRemove; + super.performSnapshotWithoutCapture(minReplicatedToAllIndex); } - protected ClientRequestTracker findClientRequestTracker(long logIndex) { - for (ClientRequestTracker tracker : trackerList) { - if (tracker.getIndex() == logIndex) { - return tracker; + @Override + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + final Iterator it = trackers.iterator(); + while (it.hasNext()) { + final ClientRequestTracker t = it.next(); + if (t.getIndex() == logIndex) { + it.remove(); + return t; } } + return null; } @@ -237,289 +404,320 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return this; } - @Override - public RaftState state() { - return RaftState.Leader; - } + protected void beforeSendHeartbeat(){} @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); - Object message = fromSerializableMessage(originalMessage); - if (message instanceof RaftRPC) { RaftRPC rpc = (RaftRPC) message; // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", + logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return switchBehavior(new Follower(context)); + return internalSwitchBehavior(RaftState.Follower); } } - try { - if (message instanceof SendHeartBeat) { - sendHeartBeat(); - return this; - - } else if(message instanceof InitiateInstallSnapshot) { - installSnapshotIfNeeded(); - - } else if(message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); - sendInstallSnapshot(); - - } else if (message instanceof Replicate) { - replicate((Replicate) message); - - } else if (message instanceof InstallSnapshotReply){ - handleInstallSnapshotReply((InstallSnapshotReply) message); - - } - } finally { + if (message instanceof SendHeartBeat) { + beforeSendHeartbeat(); + sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + } else if(message instanceof SendInstallSnapshot) { + // received from RaftActor + setSnapshot(((SendInstallSnapshot) message).getSnapshot()); + sendInstallSnapshot(); + } else if (message instanceof Replicate) { + replicate((Replicate) message); + } else if (message instanceof InstallSnapshotReply) { + handleInstallSnapshotReply((InstallSnapshotReply) message); + } else { + return super.handleMessage(sender, message); } - return super.handleMessage(sender, message); + return this; } private void handleInstallSnapshotReply(InstallSnapshotReply reply) { + LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); + String followerId = reply.getFollowerId(); FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + + if (followerToSnapshot == null) { + LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", + logName(), followerId); + return; + } + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - followerLogInformation.markFollowerActive(); + if(followerLogInformation == null) { + // This can happen during AddServer if it times out. + LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", + logName(), followerId); + mapFollowerToSnapshot.remove(followerId); + return; + } - if (followerToSnapshot != null && - followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + followerLogInformation.markFollowerActive(); + if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + boolean wasLastChunk = false; if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply if(LOG.isDebugEnabled()) { - LOG.debug("InstallSnapshotReply received, " + - "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - reply.getChunkIndex(), followerId, + LOG.debug("{}: InstallSnapshotReply received, " + + "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}", + logName(), reply.getChunkIndex(), followerId, context.getReplicatedLog().getSnapshotIndex() + 1 ); } - followerLogInformation.setMatchIndex( - context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation.setNextIndex( - context.getReplicatedLog().getSnapshotIndex() + 1); + long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + followerLogInformation.setMatchIndex(followerMatchIndex); + followerLogInformation.setNextIndex(followerMatchIndex + 1); mapFollowerToSnapshot.remove(followerId); - if(LOG.isDebugEnabled()) { - LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" + - followerToLog.get(followerId).getNextIndex().get()); - } + LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", + logName(), followerId, followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory - setSnapshot(Optional.absent()); + setSnapshot(null); + } + wasLastChunk = true; + if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ + UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = + new UnInitializedFollowerSnapshotReply(followerId); + context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor()); + LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self"); } - } else { followerToSnapshot.markSendStatus(true); } } else { - LOG.info("InstallSnapshotReply received, " + - "sending snapshot chunk failed, Will retry, Chunk:{}", - reply.getChunkIndex() - ); + LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", + logName(), reply.getChunkIndex()); + followerToSnapshot.markSendStatus(false); } + if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { + // Since the follower is now caught up try to purge the log. + purgeInMemoryLog(); + } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + ActorSelection followerActor = context.getPeerActorSelection(followerId); + if(followerActor != null) { + sendSnapshotChunk(followerActor, followerId); + } + } + } else { - LOG.error("ERROR!!" + - "FollowerId in InstallSnapshotReply not known to Leader" + - " or Chunk Index in InstallSnapshotReply not matching {} != {}", - followerToSnapshot.getChunkIndex(), reply.getChunkIndex() - ); + LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", + logName(), reply.getChunkIndex(), followerId, + followerToSnapshot.getChunkIndex()); + + if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ + // Since the Follower did not find this index to be valid we should reset the follower snapshot + // so that Installing the snapshot can resume from the beginning + followerToSnapshot.reset(); + } } } private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - if(LOG.isDebugEnabled()) { - LOG.debug("Replicate message {}", logIndex); - } + LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); // Create a tracker entry we will use this later to notify the // client actor - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); + if(replicate.getClientActor() != null) { + trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), + logIndex)); + } + + boolean applyModificationToState = !context.anyVotingPeers() + || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); - if (followers.size() == 0) { + if(applyModificationToState){ context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); - } else { - sendAppendEntries(); + } + + if (!followerToLog.isEmpty()) { + sendAppendEntries(0, false); } } - private void sendAppendEntries() { + protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { // Send an AppendEntries to all followers - for (String followerId : followers) { - ActorSelection followerActor = context.getPeerActorSelection(followerId); + for (Entry e : followerToLog.entrySet()) { + final String followerId = e.getKey(); + final FollowerLogInformation followerLogInformation = e.getValue(); + // This checks helps not to send a repeat message to the follower + if(!followerLogInformation.isFollowerActive() || + followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { + sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat); + } + } + } - if (followerActor != null) { - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long followerNextIndex = followerLogInformation.getNextIndex().get(); - boolean isFollowerActive = followerLogInformation.isFollowerActive(); - List entries = null; - - if (mapFollowerToSnapshot.get(followerId) != null) { - // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { - sendSnapshotChunk(followerActor, followerId); - } else { - // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerNextIndex, - Collections.emptyList()); - } + /** + * + * This method checks if any update needs to be sent to the given follower. This includes append log entries, + * sending next snapshot chunk, and initiating a snapshot. + * @return true if any update is sent, false otherwise + */ - } else { - long leaderLastIndex = context.getReplicatedLog().lastIndex(); - long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - - if (isFollowerActive && - context.getReplicatedLog().isPresent(followerNextIndex)) { - // FIXME : Sending one entry at a time - entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - - } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex ) { - // if the followers next index is not present in the leaders log, and - // if the follower is just not starting and if leader's index is more than followers index - // then snapshot should be sent - - if(LOG.isDebugEnabled()) { - LOG.debug("InitiateInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex - ); - } - actor().tell(new InitiateInstallSnapshot(), actor()); - - // we would want to sent AE as the capture snapshot might take time - entries = Collections.emptyList(); - - } else { - //we send an AppendEntries, even if the follower is inactive - // in-order to update the followers timestamp, in case it becomes active again - entries = Collections.emptyList(); + private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, + boolean sendHeartbeat, boolean isHeartbeat) { + + ActorSelection followerActor = context.getPeerActorSelection(followerId); + if (followerActor != null) { + long followerNextIndex = followerLogInformation.getNextIndex(); + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + boolean sendAppendEntries = false; + List entries = Collections.emptyList(); + + if (mapFollowerToSnapshot.get(followerId) != null) { + // if install snapshot is in process , then sent next chunk if possible + if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerId); + } else if(sendHeartbeat) { + // we send a heartbeat even if we have not received a reply for the last chunk + sendAppendEntries = true; + } + } else { + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + + if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) { + LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); + } + + if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { + + LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), + followerNextIndex, followerId); + + if(followerLogInformation.okToReplicate()) { + // Try to send all the entries in the journal but not exceeding the max data size + // for a single AppendEntries message. + int maxEntries = (int) context.getReplicatedLog().size(); + entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries, + context.getConfigParams().getSnapshotChunkSize()); + sendAppendEntries = true; + } + } else if (isFollowerActive && followerNextIndex >= 0 && + leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { + // if the followers next index is not present in the leaders log, and + // if the follower is just not starting and if leader's index is more than followers index + // then snapshot should be sent + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " + + "follower-nextIndex: %d, leader-snapshot-index: %d, " + + "leader-last-index: %d", logName(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); } - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries); + // Send heartbeat to follower whenever install snapshot is initiated. + sendAppendEntries = true; + if (canInstallSnapshot(followerNextIndex)) { + initiateCaptureSnapshot(followerId); + } + } else if(sendHeartbeat) { + // we send an AppendEntries, even if the follower is inactive + // in-order to update the followers timestamp, in case it becomes active again + sendAppendEntries = true; } + + } + + if(sendAppendEntries) { + sendAppendEntriesToFollower(followerActor, followerNextIndex, + entries, followerId); } } } private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, - List entries) { - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); + List entries, String followerId) { + AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), + getLogEntryIndex(followerNextIndex - 1), + getLogEntryTerm(followerNextIndex - 1), entries, + context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion()); + + if(!entries.isEmpty() || LOG.isTraceEnabled()) { + LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId, + appendEntries); + } + + followerActor.tell(appendEntries, actor()); } /** - * An installSnapshot is scheduled at a interval that is a multiple of - * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing - * snapshots at every heartbeat. - * * Install Snapshot works as follows - * 1. Leader sends a InitiateInstallSnapshot message to self - * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor - * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log + * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor + * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log * and makes a call to Leader's handleMessage , with SendInstallSnapshot message. - * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower - * 5. On complete, Follower sends back a InstallSnapshotReply. - * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower + * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower + * 4. On complete, Follower sends back a InstallSnapshotReply. + * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower * and replenishes the memory by deleting the snapshot in Replicated log. - * + * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply) + * then send the existing snapshot in chunks to the follower. + * @param followerId */ - private void installSnapshotIfNeeded() { - for (String followerId : followers) { - ActorSelection followerActor = - context.getPeerActorSelection(followerId); - - if(followerActor != null) { - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - long nextIndex = followerLogInformation.getNextIndex().get(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { - LOG.info("{} follower needs a snapshot install", followerId); - if (snapshot.isPresent()) { - // if a snapshot is present in the memory, most likely another install is in progress - // no need to capture snapshot - sendSnapshotChunk(followerActor, followerId); - - } else { - initiateCaptureSnapshot(); - //we just need 1 follower who would need snapshot to be installed. - // when we have the snapshot captured, we would again check (in SendInstallSnapshot) - // who needs an install and send to all who need - break; - } - - } - } + public boolean initiateCaptureSnapshot(String followerId) { + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot. + // This could happen if another follower needs an install when one is going on. + final ActorSelection followerActor = context.getPeerActorSelection(followerId); + sendSnapshotChunk(followerActor, followerId); + return true; + } else { + return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } - // on every install snapshot, we try to capture the snapshot. - // Once a capture is going on, another one issued will get ignored by RaftActor. - private void initiateCaptureSnapshot() { - LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId()); - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; + private boolean canInstallSnapshot(long nextIndex){ + // If the follower's nextIndex is -1 then we might as well send it a snapshot + // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present + // in the snapshot + return nextIndex == -1 || + (!context.getReplicatedLog().isPresent(nextIndex) + && context.getReplicatedLog().isInSnapshot(nextIndex)); - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); - } - - boolean isInstallSnapshotInitiated = true; - actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), - lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), - actor()); } private void sendInstallSnapshot() { - for (String followerId : followers) { + LOG.debug("{}: sendInstallSnapshot", logName()); + for (Entry e : followerToLog.entrySet()) { + String followerId = e.getKey(); ActorSelection followerActor = context.getPeerActorSelection(followerId); + FollowerLogInformation followerLogInfo = e.getValue(); - if(followerActor != null) { - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long nextIndex = followerLogInformation.getNextIndex().get(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { + if (followerActor != null) { + long nextIndex = followerLogInfo.getNextIndex(); + if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED || + canInstallSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, followerId); } } @@ -533,22 +731,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { + byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); + + // Note: the previous call to getNextSnapshotChunk has the side-effect of adding + // followerId to the followerToSnapshot map. + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + + int nextChunkIndex = followerToSnapshot.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if(followerToSnapshot.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } + followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - getNextSnapshotChunk(followerId,snapshot.get()), - mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks() - ).toSerializable(), + snapshot.get().getLastIncludedIndex(), + snapshot.get().getLastIncludedTerm(), + nextSnapshotChunk, + nextChunkIndex, + followerToSnapshot.getTotalChunks(), + Optional.of(followerToSnapshot.getLastChunkHashCode()), + serverConfig + ).toSerializable(followerToLog.get(followerId).getRaftVersion()), actor() ); - LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", - followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", + logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), + followerToSnapshot.getTotalChunks()); + } } } catch (IOException e) { - LOG.error(e, "InstallSnapshot failed for Leader."); + LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e); } } @@ -556,22 +771,23 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * Acccepts snaphot as ByteString, enters into map for future chunks * creates and return a ByteString chunk */ - private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { + private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { followerToSnapshot = new FollowerToSnapshot(snapshotBytes); mapFollowerToSnapshot.put(followerId, followerToSnapshot); } - ByteString nextChunk = followerToSnapshot.getNextChunk(); - if (LOG.isDebugEnabled()) { - LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); - } + byte[] nextChunk = followerToSnapshot.getNextChunk(); + + LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length); + return nextChunk; } private void sendHeartBeat() { - if (followers.size() > 0) { - sendAppendEntries(); + if (!followerToLog.isEmpty()) { + LOG.trace("{}: Sending heartbeat", logName()); + sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true); } } @@ -582,7 +798,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void scheduleHeartBeat(FiniteDuration interval) { - if(followers.size() == 0){ + if (followerToLog.isEmpty()) { // Optimization - do not bother scheduling a heartbeat as there are // no followers return; @@ -596,31 +812,37 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // need to be sent if there are other messages being sent to the remote // actor. heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( - interval, context.getActor(), new SendHeartBeat(), + interval, context.getActor(), SendHeartBeat.INSTANCE, context.getActorSystem().dispatcher(), context.getActor()); } @Override - public void close() throws Exception { + public void close() { stopHeartBeat(); } @Override - public String getLeaderId() { + public final String getLeaderId() { return context.getId(); } + @Override + public final short getLeaderPayloadVersion() { + return context.getPayloadVersion(); + } + protected boolean isLeaderIsolated() { - int minPresent = minIsolatedLeaderPeerCount; + int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { - if (followerLogInformation.isFollowerActive()) { + final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId()); + if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { --minPresent; if (minPresent == 0) { - break; + return false; } } } - return (minPresent != 0); + return minPresent != 0; } /** @@ -628,26 +850,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * snapshot chunks */ protected class FollowerToSnapshot { - private ByteString snapshotBytes; + private final ByteString snapshotBytes; private int offset = 0; // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset private int replyReceivedForOffset; // if replyStatus is false, the previous chunk is attempted private boolean replyStatus = false; private int chunkIndex; - private int totalChunks; + private final int totalChunks; + private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; public FollowerToSnapshot(ByteString snapshotBytes) { this.snapshotBytes = snapshotBytes; - replyReceivedForOffset = -1; - chunkIndex = 1; int size = snapshotBytes.size(); - totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + - ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); + totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) + + (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0); if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot {} bytes, total chunks to send:{}", - size, totalChunks); + LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", + logName(), size, totalChunks); } + replyReceivedForOffset = -1; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; } public ByteString getSnapshotBytes() { @@ -692,6 +916,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // if the chunk sent was successful replyReceivedForOffset = offset; replyStatus = true; + lastChunkHashCode = nextChunkHashCode; } else { // if the chunk sent was failure replyReceivedForOffset = offset; @@ -699,40 +924,99 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - public ByteString getNextChunk() { + public byte[] getNextChunk() { int snapshotLength = getSnapshotBytes().size(); int start = incrementOffset(); int size = context.getConfigParams().getSnapshotChunkSize(); if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { size = snapshotLength; - } else { - if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { - size = snapshotLength - start; - } + } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { + size = snapshotLength - start; } - if(LOG.isDebugEnabled()) { - LOG.debug("length={}, offset={},size={}", - snapshotLength, start, size); - } - return getSnapshotBytes().substring(start, start + size); + byte[] nextChunk = new byte[size]; + getSnapshotBytes().copyTo(nextChunk, start, 0, size); + nextChunkHashCode = Arrays.hashCode(nextChunk); + + LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(), + snapshotLength, start, size, nextChunkHashCode); + return nextChunk; + } + + /** + * reset should be called when the Follower needs to be sent the snapshot from the beginning + */ + public void reset(){ + offset = 0; + replyStatus = false; + replyReceivedForOffset = offset; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; + lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + } + public int getLastChunkHashCode() { + return lastChunkHashCode; } } // called from example-actor for printing the follower-states public String printFollowerStates() { - StringBuilder sb = new StringBuilder(); - for(FollowerLogInformation followerLogInformation : followerToLog.values()) { - boolean isFollowerActive = followerLogInformation.isFollowerActive(); - sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},"); + final StringBuilder sb = new StringBuilder(); + sb.append('['); + for (FollowerLogInformation followerLogInformation : followerToLog.values()) { + sb.append('{'); + sb.append(followerLogInformation.getId()); + sb.append(" state:"); + sb.append(followerLogInformation.isFollowerActive()); + sb.append("},"); } - return "[" + sb.toString() + "]"; + sb.append(']'); + + return sb.toString(); + } + + @VisibleForTesting + public FollowerLogInformation getFollower(String followerId) { + return followerToLog.get(followerId); + } + + @VisibleForTesting + protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) { + mapFollowerToSnapshot.put(followerId, snapshot); + } + + @VisibleForTesting + public int followerSnapshotSize() { + return mapFollowerToSnapshot.size(); } @VisibleForTesting - void markFollowerActive(String followerId) { - followerToLog.get(followerId).markFollowerActive(); + public int followerLogSize() { + return followerToLog.size(); + } + + private static class SnapshotHolder { + private final long lastIncludedTerm; + private final long lastIncludedIndex; + private final ByteString snapshotBytes; + + SnapshotHolder(Snapshot snapshot) { + this.lastIncludedTerm = snapshot.getLastAppliedTerm(); + this.lastIncludedIndex = snapshot.getLastAppliedIndex(); + this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + } + + long getLastIncludedTerm() { + return lastIncludedTerm; + } + + long getLastIncludedIndex() { + return lastIncludedIndex; + } + + ByteString getSnapshotBytes() { + return snapshotBytes; + } } }