X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=a63c62fa30740b5830676ab6f15f3de9c1988e7b;hp=9e4f3b46c45b5de54458770b910b63f6c70305a8;hb=3e77d4e181b0024936084e10d55ae0d7f285b5ad;hpb=1f1a33293f4585f9767751b6e20861f6bae53a5b diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 9e4f3b46c4..a63c62fa30 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -91,17 +91,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; - private long replicatedToAllIndex = -1; - public AbstractLeader(RaftActorContext context) { - super(context); + super(context, RaftState.Leader); final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - context.getCommitIndex(), -1, - context.getConfigParams().getElectionTimeOutInterval()); + new FollowerLogInformationImpl(followerId, -1, context); ftlBuilder.put(followerId, followerLogInformation); } @@ -109,7 +105,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { leaderId = context.getId(); - LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds()); + LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); @@ -127,7 +123,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to // prevent election timeouts (§5.2) - sendAppendEntries(0); + sendAppendEntries(0, false); + + // It is important to schedule this heartbeat here + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } /** @@ -135,14 +134,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * * @return Collection of follower IDs */ - protected final Collection getFollowerIds() { + public final Collection getFollowerIds() { return followerToLog.keySet(); } - private Optional getSnapshot() { - return snapshot; - } - @VisibleForTesting void setSnapshot(Optional snapshot) { this.snapshot = snapshot; @@ -152,9 +147,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries); - } + LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries); return this; } @@ -163,10 +156,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { - if(! appendEntriesReply.isSuccess()) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply); - } + if(LOG.isTraceEnabled()) { + LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); } // Update the FollowerLogInformation @@ -175,18 +166,31 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId); + LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); return this; } + if(followerLogInformation.timeSinceLastActivity() > + context.getConfigParams().getElectionTimeOutInterval().toMillis()) { + LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", + logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(), + context.getLastApplied(), context.getCommitIndex()); + } + followerLogInformation.markFollowerActive(); + boolean updated = false; if (appendEntriesReply.isSuccess()) { - followerLogInformation - .setMatchIndex(appendEntriesReply.getLogLastIndex()); - followerLogInformation - .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); + updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); + updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; + + if(updated && LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(), + followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); + } } else { + LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); // TODO: When we find that the follower is out of sync with the // Leader we simply decrement that followers next index by 1. @@ -223,6 +227,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { + if(LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied()); + } + applyLogToStateMachine(context.getCommitIndex()); } @@ -231,20 +240,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false); + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); return this; } private void purgeInMemoryLog() { - //find the lowest index across followers which has been replicated to all. -1 if there are no followers. + //find the lowest index across followers which has been replicated to all. + // lastApplied if there are no followers, so that we keep clearing the log for single-node // we would delete the in-mem log from that index on, in-order to minimize mem usage // we would also share this info thru AE with the followers so that they can delete their log entries as well. - long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE; + long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE; for (FollowerLogInformation info : followerToLog.values()) { minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex()); } - replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex); + super.performSnapshotWithoutCapture(minReplicatedToAllIndex); } @Override @@ -277,10 +287,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return this; } - @Override - public RaftState state() { - return RaftState.Leader; - } + protected void beforeSendHeartbeat(){} @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { @@ -294,8 +301,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(), - rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); + LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", + logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); @@ -303,37 +310,38 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - try { - if (message instanceof SendHeartBeat) { - sendHeartBeat(); - return this; + if (message instanceof SendHeartBeat) { + beforeSendHeartbeat(); + sendHeartBeat(); + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + return this; - } else if(message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); - sendInstallSnapshot(); + } else if(message instanceof SendInstallSnapshot) { + // received from RaftActor + setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + sendInstallSnapshot(); - } else if (message instanceof Replicate) { - replicate((Replicate) message); + } else if (message instanceof Replicate) { + replicate((Replicate) message); - } else if (message instanceof InstallSnapshotReply){ - handleInstallSnapshotReply((InstallSnapshotReply) message); + } else if (message instanceof InstallSnapshotReply){ + handleInstallSnapshotReply((InstallSnapshotReply) message); - } - } finally { - scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + return super.handleMessage(sender, message); } private void handleInstallSnapshotReply(InstallSnapshotReply reply) { + LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); + String followerId = reply.getFollowerId(); FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", - context.getId(), followerId); + logName(), followerId); return; } @@ -347,8 +355,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { //this was the last chunk reply if(LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshotReply received, " + - "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - context.getId(), reply.getChunkIndex(), followerId, + "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}", + logName(), reply.getChunkIndex(), followerId, context.getReplicatedLog().getSnapshotIndex() + 1 ); } @@ -359,10 +367,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotIndex() + 1); mapFollowerToSnapshot.remove(followerId); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" + - context.getId(), followerToLog.get(followerId).getNextIndex()); - } + LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", + logName(), followerId, followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots @@ -376,12 +383,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } else { LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", - context.getId(), reply.getChunkIndex()); + logName(), reply.getChunkIndex()); followerToSnapshot.markSendStatus(false); } - if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + if (wasLastChunk && !context.isSnapshotCaptureInitiated()) { + // Since the follower is now caught up try to purge the log. + purgeInMemoryLog(); + } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if(followerActor != null) { sendSnapshotChunk(followerActor, followerId); @@ -390,7 +400,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", - context.getId(), reply.getChunkIndex(), followerId, + logName(), reply.getChunkIndex(), followerId, followerToSnapshot.getChunkIndex()); if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ @@ -404,9 +414,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Replicate message {}", context.getId(), logIndex); - } + LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(), + replicate.getIdentifier(), logIndex); // Create a tracker entry we will use this later to notify the // client actor @@ -420,11 +429,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } else { - sendAppendEntries(0); + sendAppendEntries(0, false); } } - private void sendAppendEntries(long timeSinceLastActivityInterval) { + private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { // Send an AppendEntries to all followers for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); @@ -432,7 +441,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // This checks helps not to send a repeat message to the follower if(!followerLogInformation.isFollowerActive() || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { - sendUpdatesToFollower(followerId, followerLogInformation, true); + sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat); } } } @@ -445,12 +454,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { */ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, - boolean sendHeartbeat) { + boolean sendHeartbeat, boolean isHeartbeat) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); + boolean sendAppendEntries = false; + List entries = Collections.EMPTY_LIST; if (mapFollowerToSnapshot.get(followerId) != null) { // if install snapshot is in process , then sent next chunk if possible @@ -458,47 +469,56 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendSnapshotChunk(followerActor, followerId); } else if(sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if (isFollowerActive && - context.getReplicatedLog().isPresent(followerNextIndex)) { - // FIXME : Sending one entry at a time - final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); + if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) { + LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); + } + + if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { + + LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), + followerNextIndex, followerId); + // FIXME : Sending one entry at a time + if(followerLogInformation.okToReplicate()) { + entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + sendAppendEntries = true; + } } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex) { + leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent if (LOG.isDebugEnabled()) { - LOG.debug("InitiateInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex - ); + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + + "follower-nextIndex: %d, leader-snapshot-index: %d, " + + "leader-last-index: %d", logName(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); } // Send heartbeat to follower whenever install snapshot is initiated. - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); - + sendAppendEntries = true; initiateCaptureSnapshot(followerId, followerNextIndex); } else if(sendHeartbeat) { - //we send an AppendEntries, even if the follower is inactive + // we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } + + if(sendAppendEntries) { + sendAppendEntriesToFollower(followerActor, followerNextIndex, + entries, followerId); + } } } @@ -507,10 +527,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), prevLogIndex(followerNextIndex), prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), replicatedToAllIndex); + context.getCommitIndex(), super.getReplicatedToAllIndex()); - if(!entries.isEmpty()) { - LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, + if(!entries.isEmpty() || LOG.isTraceEnabled()) { + LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId, appendEntries); } @@ -518,7 +538,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } /** - * /** * Install Snapshot works as follows * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log @@ -533,10 +552,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * @param followerNextIndex */ private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet()); - } - if (!context.getReplicatedLog().isPresent(followerNextIndex) && context.getReplicatedLog().isInSnapshot(followerNextIndex)) { @@ -549,7 +564,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else if (!context.isSnapshotCaptureInitiated()) { - LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId()); ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -557,15 +571,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (lastAppliedEntry != null) { lastAppliedIndex = lastAppliedEntry.getIndex(); lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { + } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); } boolean isInstallSnapshotInitiated = true; - actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), - lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), - actor()); + long replicatedToAllIndex = super.getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + + CaptureSnapshot captureSnapshot = new CaptureSnapshot( + lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), + isInstallSnapshotInitiated); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId, + captureSnapshot); + } + + actor().tell(captureSnapshot, actor()); context.setSnapshotCaptureInitiated(true); } } @@ -573,6 +599,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendInstallSnapshot() { + LOG.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); @@ -605,19 +632,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), - followerToSnapshot.getTotalChunks(), + followerToSnapshot.incrementChunkIndex(), + followerToSnapshot.getTotalChunks(), Optional.of(followerToSnapshot.getLastChunkHashCode()) ).toSerializable(), actor() ); - LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - context.getId(), followerActor.path(), - followerToSnapshot.getChunkIndex(), - followerToSnapshot.getTotalChunks()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", + logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), + followerToSnapshot.getTotalChunks()); + } } } catch (IOException e) { - LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e); + LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e); } } @@ -632,15 +661,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.put(followerId, followerToSnapshot); } ByteString nextChunk = followerToSnapshot.getNextChunk(); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size()); - } + + LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size()); + return nextChunk; } private void sendHeartBeat() { if (!followerToLog.isEmpty()) { - sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis()); + LOG.trace("{}: Sending heartbeat", logName()); + sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true); } } @@ -715,7 +745,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); if(LOG.isDebugEnabled()) { LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", - context.getId(), size, totalChunks); + logName(), size, totalChunks); } replyReceivedForOffset = -1; chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; @@ -783,10 +813,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(), + + LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(), snapshotLength, start, size); - } + ByteString substring = getSnapshotBytes().substring(start, start + size); nextChunkHashCode = substring.hashCode(); return substring;