X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=d97905cf11e4f7174bc85505de27b1687da347d2;hp=31bf99c2dc2bc9f5564c256294c605f6af1bd482;hb=fa96da71c5ab10973a9f93c2e8b35494b96fd7ed;hpb=cd2a6fa0d8fa6281be28d3c7b9828ecf4e932811 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 31bf99c2dc..d97905cf11 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -291,17 +292,47 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - // Now figure out if this reply warrants a change in the commitIndex - // If there exists an N such that N > commitIndex, a majority - // of matchIndex[i] ≥ N, and log[N].term == currentTerm: - // set commitIndex = N (§5.3, §5.4). if (log.isTraceEnabled()) { log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); } + possiblyUpdateCommitIndex(); + + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + + return this; + } + + private void possiblyUpdateCommitIndex() { + // Figure out if we can update the the commitIndex as follows: + // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N, + // and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). for (long index = context.getCommitIndex() + 1; ; index++) { - int replicatedCount = 1; + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); + if (replicatedLogEntry == null) { + log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), index, context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().size()); + break; + } + + // Count our entry if it has been persisted. + int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1; + + if (replicatedCount == 0) { + // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally + // we should be able to update the commit index if we get a consensus amongst the followers + // w/o the local persistence ack however this can cause timing issues with snapshot capture + // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's + // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding + // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't + // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState + // message in lieu of synchronously updating lastAppliedIndex and applying to state. + break; + } log.trace("{}: checking Nth index {}", logName(), index); for (FollowerLogInformation info : followerToLog.values()) { @@ -320,14 +351,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } if (replicatedCount >= minReplicationCount) { - ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); - if (replicatedLogEntry == null) { - log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", - logName(), index, context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().size()); - break; - } - // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: // "Raft never commits log entries from previous terms by counting replicas". // However we keep looping so we can make progress when new entries in the current term @@ -349,11 +372,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { - if (log.isDebugEnabled()) { - log.debug( - "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", - logName(), followerId, context.getCommitIndex(), context.getLastApplied()); - } + log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(), + context.getCommitIndex(), context.getLastApplied()); applyLogToStateMachine(context.getCommitIndex()); } @@ -361,11 +381,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } - - //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); - - return this; } private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation, @@ -448,6 +463,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { replicate((Replicate) message); } else if (message instanceof InstallSnapshotReply) { handleInstallSnapshotReply((InstallSnapshotReply) message); + } else if (message instanceof CheckConsensusReached) { + possiblyUpdateCommitIndex(); } else { return super.handleMessage(sender, message); }