X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=622d59e41a062346aa55eaa82e98f4255b592ee6;hp=2c433f90076b20cda5002b819bf08b4dd7211104;hb=3bc363a69d6d48709f7fd741ef018ecd75b8f99b;hpb=76adfd83e0cade28adbda66e0d8c0950601b8e44 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 2c433f9007..622d59e41a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,8 +14,6 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collection; @@ -26,13 +24,16 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; +import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -77,7 +78,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // This would be passed as the hash code of the last chunk when sending the first chunk public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; - private final Map followerToLog; + private final Map followerToLog = new HashMap<>(); private final Map mapFollowerToSnapshot = new HashMap<>(); private Cancellable heartbeatSchedule = null; @@ -88,19 +89,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected final int minIsolatedLeaderPeerCount; - private Optional snapshot; + private Optional snapshot; public AbstractLeader(RaftActorContext context) { super(context, RaftState.Leader); - final Builder ftlBuilder = ImmutableMap.builder(); + setLeaderPayloadVersion(context.getPayloadVersion()); + for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context); - ftlBuilder.put(followerId, followerLogInformation); + followerToLog.put(followerId, followerLogInformation); } - followerToLog = ftlBuilder.build(); leaderId = context.getId(); @@ -137,9 +138,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } + public void addFollower(String followerId, FollowerState followerState) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context); + followerLogInformation.setFollowerState(followerState); + followerToLog.put(followerId, followerLogInformation); + } + @VisibleForTesting - void setSnapshot(Optional snapshot) { - this.snapshot = snapshot; + void setSnapshot(@Nullable Snapshot snapshot) { + if(snapshot != null) { + this.snapshot = Optional.of(new SnapshotHolder(snapshot)); + } else { + this.snapshot = Optional.absent(); + } } @Override @@ -178,26 +189,41 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } followerLogInformation.markFollowerActive(); + followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion()); boolean updated = false; if (appendEntriesReply.isSuccess()) { - updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); - updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; - - if(updated && LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(), - followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); - } + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about + long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); + ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex); + if(appendEntriesReply.isForceInstallSnapshot()) { + // Reset the followers match and next index. This is to signal that this follower has nothing + // in common with this Leader and so would require a snapshot to be installed + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + // Force initiate a snapshot capture + initiateCaptureSnapshot(followerId); + } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null && + followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) { + // The follower's log is empty or the last entry is present in the leader's journal + // and the terms match so the follower is just behind the leader's journal from + // the last snapshot, if any. We'll catch up the follower quickly by starting at the + // follower's last log index. + + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); + } else { + // TODO: When we find that the follower is out of sync with the + // Leader we simply decrement that followers next index by 1. + // Would it be possible to do better than this? The RAFT spec + // does not explicitly deal with it but may be something for us to + // think about. - followerLogInformation.decrNextIndex(); + followerLogInformation.decrNextIndex(); + } } // Now figure out if this reply warrants a change in the commitIndex @@ -243,6 +269,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return this; } + private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation, + AppendEntriesReply appendEntriesReply) { + boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); + updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; + + if(updated && LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", + logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); + } + return updated; + } + private void purgeInMemoryLog() { //find the lowest index across followers which has been replicated to all. // lastApplied if there are no followers, so that we keep clearing the log for single-node @@ -305,7 +344,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return switchBehavior(new Follower(context)); + return internalSwitchBehavior(RaftState.Follower); } } @@ -317,7 +356,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else if(message instanceof SendInstallSnapshot) { // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + setSnapshot(((SendInstallSnapshot) message).getSnapshot()); sendInstallSnapshot(); } else if (message instanceof Replicate) { @@ -360,20 +399,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ); } - followerLogInformation.setMatchIndex( - context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation.setNextIndex( - context.getReplicatedLog().getSnapshotIndex() + 1); + long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + followerLogInformation.setMatchIndex(followerMatchIndex); + followerLogInformation.setNextIndex(followerMatchIndex + 1); mapFollowerToSnapshot.remove(followerId); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", - logName(), followerId, followerLogInformation.getMatchIndex(), - followerLogInformation.getNextIndex()); + logName(), followerId, followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory - setSnapshot(Optional.absent()); + setSnapshot(null); } wasLastChunk = true; @@ -424,10 +462,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logIndex) ); - if (followerToLog.isEmpty()) { + boolean applyModificationToState = followerToLog.isEmpty() + || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); + + if(applyModificationToState){ context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); - } else { + } + + if (!followerToLog.isEmpty()) { sendAppendEntries(0, false); } } @@ -460,7 +503,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); boolean sendAppendEntries = false; - List entries = Collections.EMPTY_LIST; + List entries = Collections.emptyList(); if (mapFollowerToSnapshot.get(followerId) != null) { // if install snapshot is in process , then sent next chunk if possible @@ -475,8 +518,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) { - LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", - logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); + LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); } if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { @@ -484,9 +527,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), followerNextIndex, followerId); - // FIXME : Sending one entry at a time if(followerLogInformation.okToReplicate()) { - entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + // Try to send all the entries in the journal but not exceeding the max data size + // for a single AppendEntries message. + int maxEntries = (int) context.getReplicatedLog().size(); + entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries, + context.getConfigParams().getSnapshotChunkSize()); sendAppendEntries = true; } } else if (isFollowerActive && followerNextIndex >= 0 && @@ -504,7 +550,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntries = true; - initiateCaptureSnapshot(followerId, followerNextIndex); + if (canInstallSnapshot(followerNextIndex)) { + initiateCaptureSnapshot(followerId); + } } else if(sendHeartbeat) { // we send an AppendEntries, even if the follower is inactive @@ -526,7 +574,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), prevLogIndex(followerNextIndex), prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), super.getReplicatedToAllIndex()); + context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion()); if(!entries.isEmpty() || LOG.isTraceEnabled()) { LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId, @@ -548,38 +596,43 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply) * then send the existing snapshot in chunks to the follower. * @param followerId - * @param followerNextIndex */ - private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { - if (!context.getReplicatedLog().isPresent(followerNextIndex) && - context.getReplicatedLog().isInSnapshot(followerNextIndex)) { + public void initiateCaptureSnapshot(String followerId) { + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot. + // This could happen if another follower needs an install when one is going on. + final ActorSelection followerActor = context.getPeerActorSelection(followerId); + sendSnapshotChunk(followerActor, followerId); - if (snapshot.isPresent()) { - // if a snapshot is present in the memory, most likely another install is in progress - // no need to capture snapshot. - // This could happen if another follower needs an install when one is going on. - final ActorSelection followerActor = context.getPeerActorSelection(followerId); - sendSnapshotChunk(followerActor, followerId); - - } else { - context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), - this.getReplicatedToAllIndex(), followerId); - } + } else { + context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } + private boolean canInstallSnapshot(long nextIndex){ + // If the follower's nextIndex is -1 then we might as well send it a snapshot + // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present + // in the snapshot + return (nextIndex == -1 || + (!context.getReplicatedLog().isPresent(nextIndex) + && context.getReplicatedLog().isInSnapshot(nextIndex))); + + } + private void sendInstallSnapshot() { LOG.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); + FollowerLogInformation followerLogInfo = e.getValue(); if (followerActor != null) { long nextIndex = e.getValue().getNextIndex(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { + if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED || + canInstallSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, e.getKey()); } } @@ -593,7 +646,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { - ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. @@ -601,8 +654,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), + snapshot.get().getLastIncludedIndex(), + snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, followerToSnapshot.incrementChunkIndex(), followerToSnapshot.getTotalChunks(), @@ -846,4 +899,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public int followerLogSize() { return followerToLog.size(); } + + private static class SnapshotHolder { + private final long lastIncludedTerm; + private final long lastIncludedIndex; + private final ByteString snapshotBytes; + + SnapshotHolder(Snapshot snapshot) { + this.lastIncludedTerm = snapshot.getLastAppliedTerm(); + this.lastIncludedIndex = snapshot.getLastAppliedIndex(); + this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + } + + long getLastIncludedTerm() { + return lastIncludedTerm; + } + + long getLastIncludedIndex() { + return lastIncludedIndex; + } + + ByteString getSnapshotBytes() { + return snapshotBytes; + } + } }