X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=5076a8a38f891c894b9e01fdc61db5c0d5fa11a9;hp=a1ec4d883135f42d88b9d08db82ba49acb1816ba;hb=95d3c7975a423951dcbdecfbfa4cb6b7a23591cc;hpb=b2f6cabb2d2b96bf334788f106cfee0ceb5feff1 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index a1ec4d8831..5076a8a38f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -16,7 +16,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -47,6 +46,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import scala.concurrent.duration.FiniteDuration; /** @@ -72,18 +72,7 @@ import scala.concurrent.duration.FiniteDuration; * set commitIndex = N (§5.3, §5.4). */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { - - // The index of the first chunk that is sent when installing a snapshot - public static final int FIRST_CHUNK_INDEX = 1; - - // The index that the follower should respond with if it needs the install snapshot to be reset - public static final int INVALID_CHUNK_INDEX = -1; - - // This would be passed as the hash code of the last chunk when sending the first chunk - public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; - private final Map followerToLog = new HashMap<>(); - private final Map mapFollowerToSnapshot = new HashMap<>(); /** * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really @@ -95,23 +84,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Queue trackers = new LinkedList<>(); private Cancellable heartbeatSchedule = null; - private Optional snapshot; + private Optional snapshot = Optional.absent();; private int minReplicationCount; - protected AbstractLeader(RaftActorContext context, RaftState state) { + protected AbstractLeader(RaftActorContext context, RaftState state, + @Nullable AbstractLeader initializeFromLeader) { super(context, state); - for(PeerInfo peerInfo: context.getPeers()) { - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); - followerToLog.put(peerInfo.getId(), followerLogInformation); + if(initializeFromLeader != null) { + followerToLog.putAll(initializeFromLeader.followerToLog); + snapshot = initializeFromLeader.snapshot; + trackers.addAll(initializeFromLeader.trackers); + } else { + for(PeerInfo peerInfo: context.getPeers()) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + followerToLog.put(peerInfo.getId(), followerLogInformation); + } } LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); updateMinReplicaCount(); - snapshot = Optional.absent(); - // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to @@ -122,6 +116,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + protected AbstractLeader(RaftActorContext context, RaftState state) { + this(context, state, null); + } + /** * Return an immutable collection of follower identifiers. * @@ -143,7 +141,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public void removeFollower(String followerId) { followerToLog.remove(followerId); - mapFollowerToSnapshot.remove(followerId); } public void updateMinReplicaCount() { @@ -177,6 +174,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } + @VisibleForTesting + boolean hasSnapshot() { + return snapshot.isPresent(); + } + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { @@ -217,13 +219,36 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion()); boolean updated = false; - if (appendEntriesReply.isSuccess()) { + if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) { + // The follower's log is actually ahead of the leader's log. Normally this doesn't happen + // in raft as a node cannot become leader if it's log is behind another's. However, the + // non-voting semantics deviate a bit from raft. Only voting members participate in + // elections and can become leader so it's possible for a non-voting follower to be ahead + // of the leader. This can happen if persistence is disabled and all voting members are + // restarted. In this case, the voting leader will start out with an empty log however + // the non-voting followers still retain the previous data in memory. On the first + // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex + // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned + // lastLogIndex may be higher in which case we want to reset the follower by installing a + // snapshot. It's also possible that the follower's last log index is behind the leader's. + // However in this case the log terms won't match and the logs will conflict - this is handled + // elsewhere. + LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot", + logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(), + context.getReplicatedLog().lastIndex()); + + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + initiateCaptureSnapshot(followerId); + updated = true; + } else if (appendEntriesReply.isSuccess()) { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); - ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex); + long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex); if(appendEntriesReply.isForceInstallSnapshot()) { // Reset the followers match and next index. This is to signal that this follower has nothing // in common with this Leader and so would require a snapshot to be installed @@ -232,8 +257,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Force initiate a snapshot capture initiateCaptureSnapshot(followerId); - } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null && - followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) { + } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 && + followersLastLogTerm == appendEntriesReply.getLogLastTerm())) { // The follower's log is empty or the last entry is present in the leader's journal // and the terms match so the follower is just behind the leader's journal from // the last snapshot, if any. We'll catch up the follower quickly by starting at the @@ -241,11 +266,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about. + // The follower's log conflicts with leader's log so decrement follower's next index by 1 + // in an attempt to find where the logs match. + + LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index", + logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm); followerLogInformation.decrNextIndex(); } @@ -266,7 +291,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.trace("{}: checking Nth index {}", logName(), N); for (FollowerLogInformation info : followerToLog.values()) { final PeerInfo peerInfo = context.getPeerInfo(info.getId()); - if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) { + if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) { replicatedCount++; } else if(LOG.isTraceEnabled()) { LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), @@ -320,6 +345,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + return this; } @@ -372,11 +398,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected void beforeSendHeartbeat(){} @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); - Object message = fromSerializableMessage(originalMessage); - if (message instanceof RaftRPC) { RaftRPC rpc = (RaftRPC) message; // If RPC request or response contains term T > currentTerm: @@ -415,29 +439,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); String followerId = reply.getFollowerId(); - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); - - if (followerToSnapshot == null) { - LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", - logName(), followerId); - return; - } - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); if(followerLogInformation == null) { // This can happen during AddServer if it times out. LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", logName(), followerId); - mapFollowerToSnapshot.remove(followerId); + return; + } + + LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); + if (installSnapshotState == null) { + LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", + logName(), followerId); return; } followerLogInformation.markFollowerActive(); - if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) { boolean wasLastChunk = false; if (reply.isSuccess()) { - if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { + if(installSnapshotState.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply if(LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshotReply received, " + @@ -450,17 +472,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerMatchIndex = snapshot.get().getLastIncludedIndex(); followerLogInformation.setMatchIndex(followerMatchIndex); followerLogInformation.setNextIndex(followerMatchIndex + 1); - mapFollowerToSnapshot.remove(followerId); + followerLogInformation.clearLeaderInstallSnapshotState(); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", logName(), followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); - if (mapFollowerToSnapshot.isEmpty()) { + if (!anyFollowersInstallingSnapshot()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory setSnapshot(null); } + wasLastChunk = true; if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = @@ -469,19 +492,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self"); } } else { - followerToSnapshot.markSendStatus(true); + installSnapshotState.markSendStatus(true); } } else { LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", logName(), reply.getChunkIndex()); - followerToSnapshot.markSendStatus(false); + installSnapshotState.markSendStatus(false); } if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { // Since the follower is now caught up try to purge the log. purgeInMemoryLog(); - } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if(followerActor != null) { sendSnapshotChunk(followerActor, followerId); @@ -491,31 +514,41 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", logName(), reply.getChunkIndex(), followerId, - followerToSnapshot.getChunkIndex()); + installSnapshotState.getChunkIndex()); - if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ + if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){ // Since the Follower did not find this index to be valid we should reset the follower snapshot // so that Installing the snapshot can resume from the beginning - followerToSnapshot.reset(); + installSnapshotState.reset(); + } + } + } + + private boolean anyFollowersInstallingSnapshot() { + for(FollowerLogInformation info: followerToLog.values()) { + if(info.getInstallSnapshotState() != null) { + return true; } + } + + return false; } private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(), - replicate.getIdentifier(), logIndex); + LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); // Create a tracker entry we will use this later to notify the // client actor - trackers.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); + if(replicate.getClientActor() != null) { + trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), + logIndex)); + } - boolean applyModificationToState = followerToLog.isEmpty() + boolean applyModificationToState = !context.anyVotingPeers() || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); if(applyModificationToState){ @@ -558,9 +591,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { boolean sendAppendEntries = false; List entries = Collections.emptyList(); - if (mapFollowerToSnapshot.get(followerId) != null) { + LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); + if (installSnapshotState != null) { // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + if (isFollowerActive && installSnapshotState.canSendNextChunk()) { sendSnapshotChunk(followerActor, followerId); } else if(sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk @@ -595,7 +629,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // then snapshot should be sent if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " + "follower-nextIndex: %d, leader-snapshot-index: %d, " + "leader-last-index: %d", logName(), followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); @@ -625,8 +659,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, List entries, String followerId) { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, + getLogEntryIndex(followerNextIndex - 1), + getLogEntryTerm(followerNextIndex - 1), entries, context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion()); if(!entries.isEmpty() || LOG.isTraceEnabled()) { @@ -668,9 +702,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // If the follower's nextIndex is -1 then we might as well send it a snapshot // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present // in the snapshot - return (nextIndex == -1 || + return nextIndex == -1 || (!context.getReplicatedLog().isPresent(nextIndex) - && context.getReplicatedLog().isInSnapshot(nextIndex))); + && context.getReplicatedLog().isInSnapshot(nextIndex)); } @@ -703,24 +737,31 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState(); + + int nextChunkIndex = installSnapshotState.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if(installSnapshotState.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), snapshot.get().getLastIncludedIndex(), snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), - followerToSnapshot.getTotalChunks(), - Optional.of(followerToSnapshot.getLastChunkHashCode()) + nextChunkIndex, + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), + serverConfig ).toSerializable(followerToLog.get(followerId).getRaftVersion()), actor() ); if(LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), - followerToSnapshot.getTotalChunks()); + logName(), followerActor.path(), installSnapshotState.getChunkIndex(), + installSnapshotState.getTotalChunks()); } } } catch (IOException e) { @@ -733,12 +774,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * creates and return a ByteString chunk */ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); - if (followerToSnapshot == null) { - followerToSnapshot = new FollowerToSnapshot(snapshotBytes); - mapFollowerToSnapshot.put(followerId, followerToSnapshot); + LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState(); + if (installSnapshotState == null) { + installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(), + logName()); + followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState); } - byte[] nextChunk = followerToSnapshot.getNextChunk(); + byte[] nextChunk = installSnapshotState.getNextChunk(); LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length); @@ -803,121 +845,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } } - return (minPresent != 0); - } - - /** - * Encapsulates the snapshot bytestring and handles the logic of sending - * snapshot chunks - */ - protected class FollowerToSnapshot { - private final ByteString snapshotBytes; - private int offset = 0; - // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset - private int replyReceivedForOffset; - // if replyStatus is false, the previous chunk is attempted - private boolean replyStatus = false; - private int chunkIndex; - private final int totalChunks; - private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - - public FollowerToSnapshot(ByteString snapshotBytes) { - this.snapshotBytes = snapshotBytes; - int size = snapshotBytes.size(); - totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + - ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", - logName(), size, totalChunks); - } - replyReceivedForOffset = -1; - chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; - } - - public ByteString getSnapshotBytes() { - return snapshotBytes; - } - - public int incrementOffset() { - if(replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - offset = offset + context.getConfigParams().getSnapshotChunkSize(); - } - return offset; - } - - public int incrementChunkIndex() { - if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - chunkIndex = chunkIndex + 1; - } - return chunkIndex; - } - - public int getChunkIndex() { - return chunkIndex; - } - - public int getTotalChunks() { - return totalChunks; - } - - public boolean canSendNextChunk() { - // we only send a false if a chunk is sent but we have not received a reply yet - return replyReceivedForOffset == offset; - } - - public boolean isLastChunk(int chunkIndex) { - return totalChunks == chunkIndex; - } - - public void markSendStatus(boolean success) { - if (success) { - // if the chunk sent was successful - replyReceivedForOffset = offset; - replyStatus = true; - lastChunkHashCode = nextChunkHashCode; - } else { - // if the chunk sent was failure - replyReceivedForOffset = offset; - replyStatus = false; - } - } - - public byte[] getNextChunk() { - int snapshotLength = getSnapshotBytes().size(); - int start = incrementOffset(); - int size = context.getConfigParams().getSnapshotChunkSize(); - if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { - size = snapshotLength; - } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { - size = snapshotLength - start; - } - - byte[] nextChunk = new byte[size]; - getSnapshotBytes().copyTo(nextChunk, start, 0, size); - nextChunkHashCode = Arrays.hashCode(nextChunk); - - LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(), - snapshotLength, start, size, nextChunkHashCode); - return nextChunk; - } - - /** - * reset should be called when the Follower needs to be sent the snapshot from the beginning - */ - public void reset(){ - offset = 0; - replyStatus = false; - replyReceivedForOffset = offset; - chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; - lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - } - - public int getLastChunkHashCode() { - return lastChunkHashCode; - } + return minPresent != 0; } // called from example-actor for printing the follower-states @@ -942,16 +870,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.get(followerId); } - @VisibleForTesting - protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) { - mapFollowerToSnapshot.put(followerId, snapshot); - } - - @VisibleForTesting - public int followerSnapshotSize() { - return mapFollowerToSnapshot.size(); - } - @VisibleForTesting public int followerLogSize() { return followerToLog.size();