X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=85f2f153ab3046d6541225c95d3863be3435b6b0;hp=be51ba069cc5056636646566d1db00b30154073a;hb=c68d251880d95d6d2f8df70c67d2cdd3a3a47685;hpb=7d03757a62c1909ee74b42e6acf30d3d7b961bfa diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index be51ba069c..85f2f153ab 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -33,7 +34,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -89,17 +90,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected final int minIsolatedLeaderPeerCount; - private Optional snapshot; + private Optional snapshot; public AbstractLeader(RaftActorContext context) { super(context, RaftState.Leader); + setLeaderPayloadVersion(context.getPayloadVersion()); + final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - context.getCommitIndex(), -1, - context.getConfigParams().getElectionTimeOutInterval()); + new FollowerLogInformationImpl(followerId, -1, context); ftlBuilder.put(followerId, followerLogInformation); } @@ -136,13 +137,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * * @return Collection of follower IDs */ - protected final Collection getFollowerIds() { + public final Collection getFollowerIds() { return followerToLog.keySet(); } @VisibleForTesting - void setSnapshot(Optional snapshot) { - this.snapshot = snapshot; + void setSnapshot(@Nullable Snapshot snapshot) { + if(snapshot != null) { + this.snapshot = Optional.of(new SnapshotHolder(snapshot)); + } else { + this.snapshot = Optional.absent(); + } } @Override @@ -160,8 +165,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(LOG.isTraceEnabled()) { LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); - } else if(LOG.isDebugEnabled() && !appendEntriesReply.isSuccess()) { - LOG.debug("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); } // Update the FollowerLogInformation @@ -176,28 +179,40 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(followerLogInformation.timeSinceLastActivity() > context.getConfigParams().getElectionTimeOutInterval().toMillis()) { - LOG.error("{} : handleAppendEntriesReply delayed beyond election timeout, " + + LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(), context.getLastApplied(), context.getCommitIndex()); } followerLogInformation.markFollowerActive(); + followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion()); + boolean updated = false; if (appendEntriesReply.isSuccess()) { - followerLogInformation - .setMatchIndex(appendEntriesReply.getLogLastIndex()); - followerLogInformation - .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { + LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); + + long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); + ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex); + if(followerLastLogIndex < 0 || (followersLastLogEntry != null && + followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) { + // The follower's log is empty or the last entry is present in the leader's journal + // and the terms match so the follower is just behind the leader's journal from + // the last snapshot, if any. We'll catch up the follower quickly by starting at the + // follower's last log index. + + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); + } else { + // TODO: When we find that the follower is out of sync with the + // Leader we simply decrement that followers next index by 1. + // Would it be possible to do better than this? The RAFT spec + // does not explicitly deal with it but may be something for us to + // think about. - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about - - followerLogInformation.decrNextIndex(); + followerLogInformation.decrNextIndex(); + } } // Now figure out if this reply warrants a change in the commitIndex @@ -226,21 +241,36 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { - LOG.debug("{}: handleAppendEntriesReply: applying to log - commitIndex: {}, lastAppliedIndex: {}", - logName(), context.getCommitIndex(), context.getLastApplied()); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied()); + } applyLogToStateMachine(context.getCommitIndex()); } - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false, false); + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); return this; } + private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation, + AppendEntriesReply appendEntriesReply) { + boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); + updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; + + if(updated && LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", + logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); + } + return updated; + } + private void purgeInMemoryLog() { //find the lowest index across followers which has been replicated to all. // lastApplied if there are no followers, so that we keep clearing the log for single-node @@ -303,7 +333,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return switchBehavior(new Follower(context)); + return internalSwitchBehavior(RaftState.Follower); } } @@ -315,7 +345,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else if(message instanceof SendInstallSnapshot) { // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + setSnapshot(((SendInstallSnapshot) message).getSnapshot()); sendInstallSnapshot(); } else if (message instanceof Replicate) { @@ -358,10 +388,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ); } - followerLogInformation.setMatchIndex( - context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation.setNextIndex( - context.getReplicatedLog().getSnapshotIndex() + 1); + long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + followerLogInformation.setMatchIndex(followerMatchIndex); + followerLogInformation.setNextIndex(followerMatchIndex + 1); mapFollowerToSnapshot.remove(followerId); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", @@ -371,7 +400,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory - setSnapshot(Optional.absent()); + setSnapshot(null); } wasLastChunk = true; @@ -385,7 +414,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } - if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { + // Since the follower is now caught up try to purge the log. + purgeInMemoryLog(); + } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if(followerActor != null) { sendSnapshotChunk(followerActor, followerId); @@ -419,10 +451,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logIndex) ); - if (followerToLog.isEmpty()) { + boolean applyModificationToState = followerToLog.isEmpty() + || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); + + if(applyModificationToState){ context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); - } else { + } + + if (!followerToLog.isEmpty()) { sendAppendEntries(0, false); } } @@ -454,6 +491,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (followerActor != null) { long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); + boolean sendAppendEntries = false; + List entries = Collections.emptyList(); if (mapFollowerToSnapshot.get(followerId) != null) { // if install snapshot is in process , then sent next chunk if possible @@ -461,16 +500,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendSnapshotChunk(followerActor, followerId); } else if(sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if(!isHeartbeat || LOG.isTraceEnabled()) { - LOG.debug("{}: Checking sendAppendEntries for follower {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", - logName(), followerId, leaderLastIndex, leaderSnapShotIndex); + if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) { + LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); } if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { @@ -478,13 +516,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), followerNextIndex, followerId); - // FIXME : Sending one entry at a time - final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); - + if(followerLogInformation.okToReplicate()) { + // Try to send all the entries in the journal but not exceeding the max data size + // for a single AppendEntries message. + int maxEntries = (int) context.getReplicatedLog().size(); + entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries, + context.getConfigParams().getSnapshotChunkSize()); + sendAppendEntries = true; + } } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { + leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent @@ -497,19 +538,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } // Send heartbeat to follower whenever install snapshot is initiated. - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); - + sendAppendEntries = true; initiateCaptureSnapshot(followerId, followerNextIndex); } else if(sendHeartbeat) { - //we send an AppendEntries, even if the follower is inactive + // we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } + + if(sendAppendEntries) { + sendAppendEntriesToFollower(followerActor, followerNextIndex, + entries, followerId); + } } } @@ -518,7 +561,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), prevLogIndex(followerNextIndex), prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), super.getReplicatedToAllIndex()); + context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion()); if(!entries.isEmpty() || LOG.isTraceEnabled()) { LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId, @@ -553,29 +596,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final ActorSelection followerActor = context.getPeerActorSelection(followerId); sendSnapshotChunk(followerActor, followerId); - } else if (!context.isSnapshotCaptureInitiated()) { - - LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", logName(), getLeaderId()); - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); - } - boolean isInstallSnapshotInitiated = true; - long replicatedToAllIndex = super.getReplicatedToAllIndex(); - ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); - actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, - (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), - (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), - isInstallSnapshotInitiated), actor()); - context.setSnapshotCaptureInitiated(true); + } else { + context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } } @@ -604,7 +628,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { - ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. @@ -612,8 +636,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), + snapshot.get().getLastIncludedIndex(), + snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, followerToSnapshot.incrementChunkIndex(), followerToSnapshot.getTotalChunks(), @@ -621,10 +645,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ).toSerializable(), actor() ); - LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - logName(), followerActor.path(), - followerToSnapshot.getChunkIndex(), - followerToSnapshot.getTotalChunks()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", + logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), + followerToSnapshot.getTotalChunks()); + } } } catch (IOException e) { LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e); @@ -855,4 +881,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public int followerLogSize() { return followerToLog.size(); } + + private static class SnapshotHolder { + private final long lastIncludedTerm; + private final long lastIncludedIndex; + private final ByteString snapshotBytes; + + SnapshotHolder(Snapshot snapshot) { + this.lastIncludedTerm = snapshot.getLastAppliedTerm(); + this.lastIncludedIndex = snapshot.getLastAppliedIndex(); + this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + } + + long getLastIncludedTerm() { + return lastIncludedTerm; + } + + long getLastIncludedIndex() { + return lastIncludedIndex; + } + + ByteString getSnapshotBytes() { + return snapshotBytes; + } + } }