X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=10c1a156f9cbbdb528076ad70fa6ecbff48052dc;hb=c274386ae1d71bb3004d69934076e1cf73508359;hp=fab1714989a99407f3d51452d33ef313ac2c14e8;hpb=660c3e22ca97bc613ea6f6288503620bba6fb233;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index fab1714989..10c1a156f9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -50,7 +51,8 @@ import scala.concurrent.duration.FiniteDuration; /** * The behavior of a RaftActor when it is in the Leader state - *

+ * + *

* Leaders: *

@@ -290,17 +292,42 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - // Now figure out if this reply warrants a change in the commitIndex - // If there exists an N such that N > commitIndex, a majority - // of matchIndex[i] ≥ N, and log[N].term == currentTerm: - // set commitIndex = N (§5.3, §5.4). if (log.isTraceEnabled()) { log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); } + possiblyUpdateCommitIndex(); + + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + + return this; + } + + private void possiblyUpdateCommitIndex() { + // Figure out if we can update the the commitIndex as follows: + // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N, + // and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). for (long index = context.getCommitIndex() + 1; ; index++) { - int replicatedCount = 1; + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); + if (replicatedLogEntry == null) { + log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), index, context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().size()); + break; + } + + // Count our entry if it has been persisted. + int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1; + + if (replicatedCount == 0) { + // We don't commit and apply a log entry until we've gotten the ack from our local persistence, + // even though there *shouldn't* be any issue with updating the commit index if we get a consensus + // amongst the followers w/o the local persistence ack. + break; + } log.trace("{}: checking Nth index {}", logName(), index); for (FollowerLogInformation info : followerToLog.values()) { @@ -319,14 +346,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } if (replicatedCount >= minReplicationCount) { - ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); - if (replicatedLogEntry == null) { - log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", - logName(), index, context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().size()); - break; - } - // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: // "Raft never commits log entries from previous terms by counting replicas". // However we keep looping so we can make progress when new entries in the current term @@ -348,11 +367,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { - if (log.isDebugEnabled()) { - log.debug( - "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", - logName(), followerId, context.getCommitIndex(), context.getLastApplied()); - } + log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(), + context.getCommitIndex(), context.getLastApplied()); applyLogToStateMachine(context.getCommitIndex()); } @@ -360,11 +376,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } - - //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); - - return this; } private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation, @@ -447,6 +458,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { replicate((Replicate) message); } else if (message instanceof InstallSnapshotReply) { handleInstallSnapshotReply((InstallSnapshotReply) message); + } else if (message instanceof CheckConsensusReached) { + possiblyUpdateCommitIndex(); } else { return super.handleMessage(sender, message); } @@ -553,8 +566,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), - replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); + log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(), + replicate.isSendImmediate()); // Create a tracker entry we will use this later to notify the // client actor @@ -571,7 +585,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(logIndex); } - if (!followerToLog.isEmpty()) { + if (replicate.isSendImmediate() && !followerToLog.isEmpty()) { sendAppendEntries(0, false); } } @@ -592,8 +606,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { /** * This method checks if any update needs to be sent to the given follower. This includes append log entries, * sending next snapshot chunk, and initiating a snapshot. - * - * @return true if any update is sent, false otherwise */ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, boolean sendHeartbeat, boolean isHeartbeat) { @@ -699,7 +711,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { /** * Initiates a snapshot capture to install on a follower. - *

+ * + *

* Install Snapshot works as follows * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor. * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to @@ -725,16 +738,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState. sendSnapshotChunk(followerActor, followerLogInfo); return true; - } else { - boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), - this.getReplicatedToAllIndex(), followerId); - if (captureInitiated) { - followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( - context.getConfigParams().getSnapshotChunkSize(), logName())); - } + } - return captureInitiated; + boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); + if (captureInitiated) { + followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( + context.getConfigParams().getSnapshotChunkSize(), logName())); } + + return captureInitiated; } private boolean canInstallSnapshot(long nextIndex) {