X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=be51ba069cc5056636646566d1db00b30154073a;hp=68cf5027dff1244d145e93dac28f2680ed9ee20a;hb=7d03757a62c1909ee74b42e6acf30d3d7b961bfa;hpb=d1daa68ccdb73666524ae3673ee24361530b285b diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 68cf5027df..be51ba069c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -35,7 +34,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -93,10 +91,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; - private long replicatedToAllIndex = -1; - public AbstractLeader(RaftActorContext context) { - super(context); + super(context, RaftState.Leader); final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { @@ -111,7 +107,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { leaderId = context.getId(); - LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds()); + LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); @@ -129,7 +125,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + sendAppendEntries(0, false); + + // It is important to schedule this heartbeat here + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } /** @@ -141,10 +140,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } - private Optional getSnapshot() { - return snapshot; - } - @VisibleForTesting void setSnapshot(Optional snapshot) { this.snapshot = snapshot; @@ -154,9 +149,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries); - } + LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries); return this; } @@ -165,10 +158,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { - if(! appendEntriesReply.isSuccess()) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply); - } + if(LOG.isTraceEnabled()) { + LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); + } else if(LOG.isDebugEnabled() && !appendEntriesReply.isSuccess()) { + LOG.debug("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); } // Update the FollowerLogInformation @@ -177,10 +170,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId); + LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); return this; } + if(followerLogInformation.timeSinceLastActivity() > + context.getConfigParams().getElectionTimeOutInterval().toMillis()) { + LOG.error("{} : handleAppendEntriesReply delayed beyond election timeout, " + + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", + logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(), + context.getLastApplied(), context.getCommitIndex()); + } + followerLogInformation.markFollowerActive(); if (appendEntriesReply.isSuccess()) { @@ -225,6 +226,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { + LOG.debug("{}: handleAppendEntriesReply: applying to log - commitIndex: {}, lastAppliedIndex: {}", + logName(), context.getCommitIndex(), context.getLastApplied()); + applyLogToStateMachine(context.getCommitIndex()); } @@ -233,20 +237,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false); + sendUpdatesToFollower(followerId, followerLogInformation, false, false); return this; } private void purgeInMemoryLog() { - //find the lowest index across followers which has been replicated to all. -1 if there are no followers. + //find the lowest index across followers which has been replicated to all. + // lastApplied if there are no followers, so that we keep clearing the log for single-node // we would delete the in-mem log from that index on, in-order to minimize mem usage // we would also share this info thru AE with the followers so that they can delete their log entries as well. - long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE; + long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE; for (FollowerLogInformation info : followerToLog.values()) { minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex()); } - replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex); + super.performSnapshotWithoutCapture(minReplicatedToAllIndex); } @Override @@ -279,10 +284,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return this; } - @Override - public RaftState state() { - return RaftState.Leader; - } + protected void beforeSendHeartbeat(){} @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { @@ -296,8 +298,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(), - rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); + LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", + logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); @@ -305,40 +307,38 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - try { - if (message instanceof SendHeartBeat) { - sendHeartBeat(); - return this; - - } else if(message instanceof InitiateInstallSnapshot) { - installSnapshotIfNeeded(); + if (message instanceof SendHeartBeat) { + beforeSendHeartbeat(); + sendHeartBeat(); + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + return this; - } else if(message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); - sendInstallSnapshot(); + } else if(message instanceof SendInstallSnapshot) { + // received from RaftActor + setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + sendInstallSnapshot(); - } else if (message instanceof Replicate) { - replicate((Replicate) message); + } else if (message instanceof Replicate) { + replicate((Replicate) message); - } else if (message instanceof InstallSnapshotReply){ - handleInstallSnapshotReply((InstallSnapshotReply) message); + } else if (message instanceof InstallSnapshotReply){ + handleInstallSnapshotReply((InstallSnapshotReply) message); - } - } finally { - scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + return super.handleMessage(sender, message); } private void handleInstallSnapshotReply(InstallSnapshotReply reply) { + LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); + String followerId = reply.getFollowerId(); FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", - context.getId(), followerId); + logName(), followerId); return; } @@ -346,13 +346,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.markFollowerActive(); if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + boolean wasLastChunk = false; if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply if(LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshotReply received, " + - "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - context.getId(), reply.getChunkIndex(), followerId, + "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}", + logName(), reply.getChunkIndex(), followerId, context.getReplicatedLog().getSnapshotIndex() + 1 ); } @@ -363,29 +364,37 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotIndex() + 1); mapFollowerToSnapshot.remove(followerId); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" + - context.getId(), followerToLog.get(followerId).getNextIndex()); - } + LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", + logName(), followerId, followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory setSnapshot(Optional.absent()); } + wasLastChunk = true; } else { followerToSnapshot.markSendStatus(true); } } else { LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", - context.getId(), reply.getChunkIndex()); + logName(), reply.getChunkIndex()); followerToSnapshot.markSendStatus(false); } + + if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + ActorSelection followerActor = context.getPeerActorSelection(followerId); + if(followerActor != null) { + sendSnapshotChunk(followerActor, followerId); + } + } + } else { LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", - context.getId(), reply.getChunkIndex(), followerId, + logName(), reply.getChunkIndex(), followerId, followerToSnapshot.getChunkIndex()); if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ @@ -399,9 +408,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Replicate message {}", context.getId(), logIndex); - } + LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(), + replicate.getIdentifier(), logIndex); // Create a tracker entry we will use this later to notify the // client actor @@ -415,19 +423,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } else { - sendAppendEntries(); + sendAppendEntries(0, false); } } - private void sendAppendEntries() { + private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { // Send an AppendEntries to all followers - long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis(); for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); final FollowerLogInformation followerLogInformation = e.getValue(); // This checks helps not to send a repeat message to the follower - if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) { - sendUpdatesToFollower(followerId, followerLogInformation, true); + if(!followerLogInformation.isFollowerActive() || + followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { + sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat); } } } @@ -440,7 +448,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { */ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, - boolean sendHeartbeat) { + boolean sendHeartbeat, boolean isHeartbeat) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { @@ -459,32 +467,41 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if (isFollowerActive && - context.getReplicatedLog().isPresent(followerNextIndex)) { + + if(!isHeartbeat || LOG.isTraceEnabled()) { + LOG.debug("{}: Checking sendAppendEntries for follower {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + logName(), followerId, leaderLastIndex, leaderSnapShotIndex); + } + + if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { + + LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), + followerNextIndex, followerId); + // FIXME : Sending one entry at a time final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex) { + leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent if (LOG.isDebugEnabled()) { - LOG.debug("InitiateInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex - ); + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + + "follower-nextIndex: %d, leader-snapshot-index: %d, " + + "leader-last-index: %d", logName(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); } - actor().tell(new InitiateInstallSnapshot(), actor()); // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), Collections.emptyList(), followerId); + initiateCaptureSnapshot(followerId, followerNextIndex); + } else if(sendHeartbeat) { //we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again @@ -501,10 +518,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), prevLogIndex(followerNextIndex), prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), replicatedToAllIndex); + context.getCommitIndex(), super.getReplicatedToAllIndex()); - if(!entries.isEmpty()) { - LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, + if(!entries.isEmpty() || LOG.isTraceEnabled()) { + LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId, appendEntries); } @@ -512,77 +529,60 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } /** - * An installSnapshot is scheduled at a interval that is a multiple of - * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing - * snapshots at every heartbeat. - * * Install Snapshot works as follows - * 1. Leader sends a InitiateInstallSnapshot message to self - * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor - * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log + * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor + * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log * and makes a call to Leader's handleMessage , with SendInstallSnapshot message. - * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower - * 5. On complete, Follower sends back a InstallSnapshotReply. - * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower + * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower + * 4. On complete, Follower sends back a InstallSnapshotReply. + * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower * and replenishes the memory by deleting the snapshot in Replicated log. - * + * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply) + * then send the existing snapshot in chunks to the follower. + * @param followerId + * @param followerNextIndex */ - private void installSnapshotIfNeeded() { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet()); - } - - for (Entry e : followerToLog.entrySet()) { - final ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); - - if (followerActor != null) { - long nextIndex = e.getValue().getNextIndex(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { - LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey()); - if (snapshot.isPresent()) { - // if a snapshot is present in the memory, most likely another install is in progress - // no need to capture snapshot - sendSnapshotChunk(followerActor, e.getKey()); - - } else { - initiateCaptureSnapshot(); - //we just need 1 follower who would need snapshot to be installed. - // when we have the snapshot captured, we would again check (in SendInstallSnapshot) - // who needs an install and send to all who need - break; - } + private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { + if (!context.getReplicatedLog().isPresent(followerNextIndex) && + context.getReplicatedLog().isInSnapshot(followerNextIndex)) { + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot. + // This could happen if another follower needs an install when one is going on. + final ActorSelection followerActor = context.getPeerActorSelection(followerId); + sendSnapshotChunk(followerActor, followerId); + + } else if (!context.isSnapshotCaptureInitiated()) { + + LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", logName(), getLeaderId()); + ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); + long lastAppliedIndex = -1; + long lastAppliedTerm = -1; + + if (lastAppliedEntry != null) { + lastAppliedIndex = lastAppliedEntry.getIndex(); + lastAppliedTerm = lastAppliedEntry.getTerm(); + } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { + lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); + lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); } + + boolean isInstallSnapshotInitiated = true; + long replicatedToAllIndex = super.getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), + isInstallSnapshotInitiated), actor()); + context.setSnapshotCaptureInitiated(true); } } } - // on every install snapshot, we try to capture the snapshot. - // Once a capture is going on, another one issued will get ignored by RaftActor. - private void initiateCaptureSnapshot() { - LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId()); - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); - } - - boolean isInstallSnapshotInitiated = true; - actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), - lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), - actor()); - } - private void sendInstallSnapshot() { + LOG.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); @@ -622,12 +622,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { actor() ); LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - context.getId(), followerActor.path(), + logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), followerToSnapshot.getTotalChunks()); } } catch (IOException e) { - LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId()); + LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e); } } @@ -642,15 +642,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.put(followerId, followerToSnapshot); } ByteString nextChunk = followerToSnapshot.getNextChunk(); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size()); - } + + LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size()); + return nextChunk; } private void sendHeartBeat() { if (!followerToLog.isEmpty()) { - sendAppendEntries(); + LOG.trace("{}: Sending heartbeat", logName()); + sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true); } } @@ -725,7 +726,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); if(LOG.isDebugEnabled()) { LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", - context.getId(), size, totalChunks); + logName(), size, totalChunks); } replyReceivedForOffset = -1; chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; @@ -793,10 +794,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(), + + LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(), snapshotLength, start, size); - } + ByteString substring = getSnapshotBytes().substring(start, start + size); nextChunkHashCode = substring.hashCode(); return substring;