X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=578989599752f41c0fefd635c879effbc187c357;hb=bbc8a16efdc6bfa0d742b73af3374a33a12e2a1c;hp=2eb3b32c6f13d0c01db85fe7fa141a1eb6c2aac9;hpb=8274ae55bc9eba37035a62f49d992f85391524ed;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 2eb3b32c6f..5789895997 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -33,6 +34,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -88,11 +90,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected final int minIsolatedLeaderPeerCount; - private Optional snapshot; + private Optional snapshot; public AbstractLeader(RaftActorContext context) { super(context, RaftState.Leader); + setLeaderPayloadVersion(context.getPayloadVersion()); + final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = @@ -138,8 +142,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - void setSnapshot(Optional snapshot) { - this.snapshot = snapshot; + void setSnapshot(@Nullable Snapshot snapshot) { + if(snapshot != null) { + this.snapshot = Optional.of(new SnapshotHolder(snapshot)); + } else { + this.snapshot = Optional.absent(); + } } @Override @@ -188,7 +196,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex); - if(followerLastLogIndex < 0 || (followersLastLogEntry != null && + if(appendEntriesReply.isForceInstallSnapshot()) { + // Reset the followers match and next index. This is to signal that this follower has nothing + // in common with this Leader and so would require a snapshot to be installed + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + // Force initiate a snapshot capture + initiateCaptureSnapshot(followerId); + } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null && followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) { // The follower's log is empty or the last entry is present in the leader's journal // and the terms match so the follower is just behind the leader's journal from @@ -325,7 +341,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return switchBehavior(new Follower(context)); + return internalSwitchBehavior(RaftState.Follower); } } @@ -337,7 +353,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else if(message instanceof SendInstallSnapshot) { // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + setSnapshot(((SendInstallSnapshot) message).getSnapshot()); sendInstallSnapshot(); } else if (message instanceof Replicate) { @@ -380,10 +396,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ); } - followerLogInformation.setMatchIndex( - context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation.setNextIndex( - context.getReplicatedLog().getSnapshotIndex() + 1); + long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + followerLogInformation.setMatchIndex(followerMatchIndex); + followerLogInformation.setNextIndex(followerMatchIndex + 1); mapFollowerToSnapshot.remove(followerId); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", @@ -393,7 +408,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory - setSnapshot(Optional.absent()); + setSnapshot(null); } wasLastChunk = true; @@ -444,10 +459,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logIndex) ); - if (followerToLog.isEmpty()) { + boolean applyModificationToState = followerToLog.isEmpty() + || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); + + if(applyModificationToState){ context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); - } else { + } + + if (!followerToLog.isEmpty()) { sendAppendEntries(0, false); } } @@ -527,7 +547,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntries = true; - initiateCaptureSnapshot(followerId, followerNextIndex); + if (canInstallSnapshot(followerNextIndex)) { + initiateCaptureSnapshot(followerId); + } } else if(sendHeartbeat) { // we send an AppendEntries, even if the follower is inactive @@ -571,27 +593,32 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply) * then send the existing snapshot in chunks to the follower. * @param followerId - * @param followerNextIndex */ - private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { - if (!context.getReplicatedLog().isPresent(followerNextIndex) && - context.getReplicatedLog().isInSnapshot(followerNextIndex)) { - - if (snapshot.isPresent()) { - // if a snapshot is present in the memory, most likely another install is in progress - // no need to capture snapshot. - // This could happen if another follower needs an install when one is going on. - final ActorSelection followerActor = context.getPeerActorSelection(followerId); - sendSnapshotChunk(followerActor, followerId); + private void initiateCaptureSnapshot(String followerId) { + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot. + // This could happen if another follower needs an install when one is going on. + final ActorSelection followerActor = context.getPeerActorSelection(followerId); + sendSnapshotChunk(followerActor, followerId); - } else { - context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), - this.getReplicatedToAllIndex(), followerId); - } + } else { + context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } + private boolean canInstallSnapshot(long nextIndex){ + // If the follower's nextIndex is -1 then we might as well send it a snapshot + // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present + // in the snapshot + return (nextIndex == -1 || + (!context.getReplicatedLog().isPresent(nextIndex) + && context.getReplicatedLog().isInSnapshot(nextIndex))); + + } + private void sendInstallSnapshot() { LOG.debug("{}: sendInstallSnapshot", logName()); @@ -600,9 +627,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (followerActor != null) { long nextIndex = e.getValue().getNextIndex(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { + if (canInstallSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, e.getKey()); } } @@ -616,7 +641,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { - ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. @@ -624,8 +649,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), + snapshot.get().getLastIncludedIndex(), + snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, followerToSnapshot.incrementChunkIndex(), followerToSnapshot.getTotalChunks(), @@ -869,4 +894,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public int followerLogSize() { return followerToLog.size(); } + + private static class SnapshotHolder { + private final long lastIncludedTerm; + private final long lastIncludedIndex; + private final ByteString snapshotBytes; + + SnapshotHolder(Snapshot snapshot) { + this.lastIncludedTerm = snapshot.getLastAppliedTerm(); + this.lastIncludedIndex = snapshot.getLastAppliedIndex(); + this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + } + + long getLastIncludedTerm() { + return lastIncludedTerm; + } + + long getLastIncludedIndex() { + return lastIncludedIndex; + } + + ByteString getSnapshotBytes() { + return snapshotBytes; + } + } }