X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=befc6d412962a7395608c4ee1981b02014f5986d;hb=8beb0dc3d3727608acddf77889cc0a5275e415af;hp=d4612dbedafb91f7015e7d66caf8d2735f45a8f2;hpb=a3adcd6cd7659b30e5115efe86440f7a2123ec20;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index d4612dbeda..befc6d4129 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -16,6 +16,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -24,16 +25,18 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; +import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -44,6 +47,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import scala.concurrent.duration.FiniteDuration; /** @@ -82,33 +86,35 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Map followerToLog = new HashMap<>(); private final Map mapFollowerToSnapshot = new HashMap<>(); - private Cancellable heartbeatSchedule = null; - - private final Collection trackerList = new LinkedList<>(); - - private int minReplicationCount; - - private int minIsolatedLeaderPeerCount; + /** + * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really + * expect the entries to be modified in sequence, hence we open-code the lookup. + * + * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly, + * but we already expect those to be far from frequent. + */ + private final Queue trackers = new LinkedList<>(); + private Cancellable heartbeatSchedule = null; private Optional snapshot; + private int minReplicationCount; - public AbstractLeader(RaftActorContext context) { - super(context, RaftState.Leader); - - setLeaderPayloadVersion(context.getPayloadVersion()); - - for (String followerId : context.getPeerAddresses().keySet()) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, -1, context); + protected AbstractLeader(RaftActorContext context, RaftState state, + @Nullable AbstractLeader initializeFromLeader) { + super(context, state); - followerToLog.put(followerId, followerLogInformation); + if(initializeFromLeader != null) { + followerToLog.putAll(initializeFromLeader.followerToLog); + } else { + for(PeerInfo peerInfo: context.getPeers()) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + followerToLog.put(peerInfo.getId(), followerLogInformation); + } } - leaderId = context.getId(); - LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); - updateMinReplicaCountAndMinIsolatedLeaderPeerCount(); + updateMinReplicaCount(); snapshot = Optional.absent(); @@ -122,6 +128,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + protected AbstractLeader(RaftActorContext context, RaftState state) { + this(context, state, null); + } + /** * Return an immutable collection of follower identifiers. * @@ -131,9 +141,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } - public void addFollower(String followerId, FollowerState followerState) { - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context); - followerLogInformation.setFollowerState(followerState); + public void addFollower(String followerId) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl( + context.getPeerInfo(followerId), -1, context); followerToLog.put(followerId, followerLogInformation); if(heartbeatSchedule == null) { @@ -143,22 +153,29 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public void removeFollower(String followerId) { followerToLog.remove(followerId); + mapFollowerToSnapshot.remove(followerId); } - public void updateMinReplicaCountAndMinIsolatedLeaderPeerCount(){ - minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); + public void updateMinReplicaCount() { + int numVoting = 0; + for(PeerInfo peer: context.getPeers()) { + if(peer.isVoting()) { + numVoting++; + } + } - //the isolated Leader peer count will be 1 less than the majority vote count. + minReplicationCount = getMajorityVoteCount(numVoting); + } + + protected int getMinIsolatedLeaderPeerCount(){ + //the isolated Leader peer count will be 1 less than the majority vote count. //this is because the vote count has the self vote counted in it //for e.g //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 - minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0; - } - protected int getMinIsolatedLeaderPeerCount(){ - return minIsolatedLeaderPeerCount; + return minReplicationCount > 0 ? (minReplicationCount - 1) : 0; } @VisibleForTesting @@ -207,15 +224,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.markFollowerActive(); followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion()); + followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion()); boolean updated = false; - if (appendEntriesReply.isSuccess()) { + if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) { + // The follower's log is actually ahead of the leader's log. Normally this doesn't happen + // in raft as a node cannot become leader if it's log is behind another's. However, the + // non-voting semantics deviate a bit from raft. Only voting members participate in + // elections and can become leader so it's possible for a non-voting follower to be ahead + // of the leader. This can happen if persistence is disabled and all voting members are + // restarted. In this case, the voting leader will start out with an empty log however + // the non-voting followers still retain the previous data in memory. On the first + // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex + // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned + // lastLogIndex may be higher in which case we want to reset the follower by installing a + // snapshot. It's also possible that the follower's last log index is behind the leader's. + // However in this case the log terms won't match and the logs will conflict - this is handled + // elsewhere. + LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot", + logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(), + context.getReplicatedLog().lastIndex()); + + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + initiateCaptureSnapshot(followerId); + updated = true; + } else if (appendEntriesReply.isSuccess()) { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); - ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex); + long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex); if(appendEntriesReply.isForceInstallSnapshot()) { // Reset the followers match and next index. This is to signal that this follower has nothing // in common with this Leader and so would require a snapshot to be installed @@ -224,8 +265,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Force initiate a snapshot capture initiateCaptureSnapshot(followerId); - } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null && - followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) { + } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 && + followersLastLogTerm == appendEntriesReply.getLogLastTerm())) { // The follower's log is empty or the last entry is present in the leader's journal // and the terms match so the follower is just behind the leader's journal from // the last snapshot, if any. We'll catch up the follower quickly by starting at the @@ -233,11 +274,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about. + // The follower's log conflicts with leader's log so decrement follower's next index by 1 + // in an attempt to find where the logs match. + + LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index", + logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm); followerLogInformation.decrNextIndex(); } @@ -247,23 +288,51 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N (§5.3, §5.4). + if(LOG.isTraceEnabled()) { + LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); + } + for (long N = context.getCommitIndex() + 1; ; N++) { int replicatedCount = 1; + LOG.trace("{}: checking Nth index {}", logName(), N); for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex() >= N) { + final PeerInfo peerInfo = context.getPeerInfo(info.getId()); + if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) { replicatedCount++; + } else if(LOG.isTraceEnabled()) { + LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), + info.getMatchIndex(), peerInfo); } } + if(LOG.isTraceEnabled()) { + LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount); + } + if (replicatedCount >= minReplicationCount) { ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); - if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) { + if (replicatedLogEntry == null) { + LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size()); + break; + } + + // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: + // "Raft never commits log entries from previous terms by counting replicas". + // However we keep looping so we can make progress when new entries in the current term + // reach consensus, as per §5.4.1: "once an entry from the current term is committed by + // counting replicas, then all prior entries are committed indirectly". + if (replicatedLogEntry.getTerm() == currentTerm()) { + LOG.trace("{}: Setting commit index to {}", logName(), N); context.setCommitIndex(N); } else { - break; + LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}", + logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); } } else { + LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount); break; } } @@ -284,6 +353,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + return this; } @@ -315,7 +385,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { @Override protected ClientRequestTracker removeClientRequestTracker(long logIndex) { - final Iterator it = trackerList.iterator(); + final Iterator it = trackers.iterator(); while (it.hasNext()) { final ClientRequestTracker t = it.next(); if (t.getIndex() == logIndex) { @@ -327,16 +397,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return null; } - @Override - protected ClientRequestTracker findClientRequestTracker(long logIndex) { - for (ClientRequestTracker tracker : trackerList) { - if (tracker.getIndex() == logIndex) { - return tracker; - } - } - return null; - } - @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { @@ -346,11 +406,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected void beforeSendHeartbeat(){} @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); - Object message = fromSerializableMessage(originalMessage); - if (message instanceof RaftRPC) { RaftRPC rpc = (RaftRPC) message; // If RPC request or response contains term T > currentTerm: @@ -370,23 +428,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { beforeSendHeartbeat(); sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); - return this; - } else if(message instanceof SendInstallSnapshot) { // received from RaftActor setSnapshot(((SendInstallSnapshot) message).getSnapshot()); sendInstallSnapshot(); - } else if (message instanceof Replicate) { replicate((Replicate) message); - - } else if (message instanceof InstallSnapshotReply){ + } else if (message instanceof InstallSnapshotReply) { handleInstallSnapshotReply((InstallSnapshotReply) message); - + } else { + return super.handleMessage(sender, message); } - - return super.handleMessage(sender, message); + return this; } private void handleInstallSnapshotReply(InstallSnapshotReply reply) { @@ -396,12 +450,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { - LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", + LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", logName(), followerId); return; } FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + if(followerLogInformation == null) { + // This can happen during AddServer if it times out. + LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", + logName(), followerId); + mapFollowerToSnapshot.remove(followerId); + return; + } + followerLogInformation.markFollowerActive(); if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { @@ -432,8 +494,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { setSnapshot(null); } wasLastChunk = true; - FollowerState followerState = followerLogInformation.getFollowerState(); - if(followerState == FollowerState.VOTING_NOT_INITIALIZED){ + if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = new UnInitializedFollowerSnapshotReply(followerId); context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor()); @@ -475,18 +536,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(), - replicate.getIdentifier(), logIndex); + LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); // Create a tracker entry we will use this later to notify the // client actor - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); + if(replicate.getClientActor() != null) { + trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), + logIndex)); + } - boolean applyModificationToState = followerToLog.isEmpty() + boolean applyModificationToState = !context.anyVotingPeers() || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); if(applyModificationToState){ @@ -499,7 +559,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { + protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { // Send an AppendEntries to all followers for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); @@ -566,7 +626,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // then snapshot should be sent if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " + "follower-nextIndex: %d, leader-snapshot-index: %d, " + "leader-last-index: %d", logName(), followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); @@ -596,8 +656,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, List entries, String followerId) { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, + getLogEntryIndex(followerNextIndex - 1), + getLogEntryTerm(followerNextIndex - 1), entries, context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion()); if(!entries.isEmpty() || LOG.isTraceEnabled()) { @@ -605,7 +665,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { appendEntries); } - followerActor.tell(appendEntries.toSerializable(), actor()); + followerActor.tell(appendEntries, actor()); } /** @@ -621,17 +681,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * then send the existing snapshot in chunks to the follower. * @param followerId */ - public void initiateCaptureSnapshot(String followerId) { + public boolean initiateCaptureSnapshot(String followerId) { if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress // no need to capture snapshot. // This could happen if another follower needs an install when one is going on. final ActorSelection followerActor = context.getPeerActorSelection(followerId); sendSnapshotChunk(followerActor, followerId); - - + return true; } else { - context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), this.getReplicatedToAllIndex(), followerId); } } @@ -650,14 +709,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendInstallSnapshot() { LOG.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { - ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); + String followerId = e.getKey(); + ActorSelection followerActor = context.getPeerActorSelection(followerId); FollowerLogInformation followerLogInfo = e.getValue(); if (followerActor != null) { - long nextIndex = e.getValue().getNextIndex(); - if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED || + long nextIndex = followerLogInfo.getNextIndex(); + if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED || canInstallSnapshot(nextIndex)) { - sendSnapshotChunk(followerActor, e.getKey()); + sendSnapshotChunk(followerActor, followerId); } } } @@ -670,21 +730,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { - ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); + byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + int nextChunkIndex = followerToSnapshot.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if(followerToSnapshot.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } + followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), snapshot.get().getLastIncludedIndex(), snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), + nextChunkIndex, followerToSnapshot.getTotalChunks(), - Optional.of(followerToSnapshot.getLastChunkHashCode()) - ).toSerializable(), + Optional.of(followerToSnapshot.getLastChunkHashCode()), + serverConfig + ).toSerializable(followerToLog.get(followerId).getRaftVersion()), actor() ); @@ -703,15 +770,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * Acccepts snaphot as ByteString, enters into map for future chunks * creates and return a ByteString chunk */ - private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { + private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { followerToSnapshot = new FollowerToSnapshot(snapshotBytes); mapFollowerToSnapshot.put(followerId, followerToSnapshot); } - ByteString nextChunk = followerToSnapshot.getNextChunk(); + byte[] nextChunk = followerToSnapshot.getNextChunk(); - LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size()); + LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length); return nextChunk; } @@ -744,27 +811,33 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // need to be sent if there are other messages being sent to the remote // actor. heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( - interval, context.getActor(), new SendHeartBeat(), + interval, context.getActor(), SendHeartBeat.INSTANCE, context.getActorSystem().dispatcher(), context.getActor()); } @Override - public void close() throws Exception { + public void close() { stopHeartBeat(); } @Override - public String getLeaderId() { + public final String getLeaderId() { return context.getId(); } + @Override + public final short getLeaderPayloadVersion() { + return context.getPayloadVersion(); + } + protected boolean isLeaderIsolated() { - int minPresent = minIsolatedLeaderPeerCount; + int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { - if (followerLogInformation.isFollowerActive()) { + final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId()); + if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { --minPresent; if (minPresent == 0) { - break; + return false; } } } @@ -850,25 +923,23 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - public ByteString getNextChunk() { + public byte[] getNextChunk() { int snapshotLength = getSnapshotBytes().size(); int start = incrementOffset(); int size = context.getConfigParams().getSnapshotChunkSize(); if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { size = snapshotLength; - } else { - if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { - size = snapshotLength - start; - } + } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { + size = snapshotLength - start; } + byte[] nextChunk = new byte[size]; + getSnapshotBytes().copyTo(nextChunk, start, 0, size); + nextChunkHashCode = Arrays.hashCode(nextChunk); - LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(), - snapshotLength, start, size); - - ByteString substring = getSnapshotBytes().substring(start, start + size); - nextChunkHashCode = substring.hashCode(); - return substring; + LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(), + snapshotLength, start, size, nextChunkHashCode); + return nextChunk; } /**