X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FFollowerLogInformation.java;h=a76d6a29c272db22c34ff66c23769384305f19e9;hp=5f185cbb9c872efe200522c3b93b554623157dd7;hb=refs%2Fchanges%2F98%2F82498%2F1;hpb=cf5be659d906cc80d52647cb516bbab435156742 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 5f185cbb9c..a76d6a29c2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -5,47 +5,373 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft; -import java.util.concurrent.atomic.AtomicLong; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; /** - * The state of the followers log as known by the Leader + * The state of the followers log as known by the Leader. + * + * @author Moiz Raja + * @author Thomas Pantelis */ -public interface FollowerLogInformation { +public final class FollowerLogInformation { + public static final long NO_INDEX = -1; + + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + + private final RaftActorContext context; + + private long nextIndex; + + private long matchIndex; + + private long lastReplicatedIndex = -1L; + + private long sentCommitIndex = -1L; + + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); + + private short payloadVersion = -1; + + // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's + // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron + // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is + // HELIUM_VERSION. + private short raftVersion = RaftVersions.HELIUM_VERSION; + + private final PeerInfo peerInfo; + + private LeaderInstallSnapshotState installSnapshotState; + + private long slicedLogEntryIndex = NO_INDEX; + + private boolean needsLeaderAddress; + + /** + * Constructs an instance. + * + * @param peerInfo the associated PeerInfo of the follower. + * @param matchIndex the initial match index. + * @param context the RaftActorContext. + */ + @VisibleForTesting + FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) { + this.nextIndex = context.getCommitIndex(); + this.matchIndex = matchIndex; + this.context = context; + this.peerInfo = requireNonNull(peerInfo); + } + + /** + * Constructs an instance with no matching index. + * + * @param peerInfo the associated PeerInfo of the follower. + * @param context the RaftActorContext. + */ + public FollowerLogInformation(final PeerInfo peerInfo, final RaftActorContext context) { + this(peerInfo, NO_INDEX, context); + } + + /** + * Increments the value of the follower's next index. + * + * @return the new value of nextIndex. + */ + @VisibleForTesting + long incrNextIndex() { + return nextIndex++; + } + + /** + * Decrements the value of the follower's next index, taking into account its reported last log index. + * + * @param followerLastIndex follower's last reported index. + * @return true if the next index was decremented, i.e. it was previously >= 0, false otherwise. + */ + public boolean decrNextIndex(final long followerLastIndex) { + if (nextIndex < 0) { + return false; + } + + if (followerLastIndex >= 0 && nextIndex > followerLastIndex) { + // If the follower's last log index is lower than nextIndex, jump directly to it, so we converge + // on a common index more quickly. + nextIndex = followerLastIndex; + } else { + nextIndex--; + } + return true; + } + + /** + * Sets the index of the follower's next log entry. + * + * @param nextIndex the new index. + * @return true if the new index differed from the current index and the current index was updated, false + * otherwise. + */ + @SuppressWarnings("checkstyle:hiddenField") + public boolean setNextIndex(final long nextIndex) { + if (this.nextIndex != nextIndex) { + this.nextIndex = nextIndex; + return true; + } + + return false; + } + + /** + * Increments the value of the follower's match index. + * + * @return the new value of matchIndex. + */ + public long incrMatchIndex() { + return matchIndex++; + } + + /** + * Sets the index of the follower's highest log entry. + * + * @param matchIndex the new index. + * @return true if the new index differed from the current index and the current index was updated, false + * otherwise. + */ + @SuppressWarnings("checkstyle:hiddenField") + public boolean setMatchIndex(final long matchIndex) { + // If the new match index is the index of the entry currently being sliced, then we know slicing is complete + // and the follower received the entry and responded so clear the slicedLogEntryIndex + if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) { + slicedLogEntryIndex = NO_INDEX; + } + + if (this.matchIndex != matchIndex) { + this.matchIndex = matchIndex; + return true; + } + + return false; + } + + /** + * Returns the identifier of the follower. + * + * @return the identifier of the follower. + */ + public String getId() { + return peerInfo.getId(); + } + + /** + * Returns the index of the next log entry to send to the follower. + * + * @return index of the follower's next log entry. + */ + public long getNextIndex() { + return nextIndex; + } + + /** + * Returns the index of highest log entry known to be replicated on the follower. + * + * @return the index of highest log entry. + */ + public long getMatchIndex() { + return matchIndex; + } + + /** + * Checks if the follower is active by comparing the time of the last activity with the election time out. The + * follower is active if some activity has occurred for the follower within the election time out interval. + * + * @return true if follower is active, false otherwise. + */ + public boolean isFollowerActive() { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + return stopwatch.isRunning() + && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis(); + } + + /** + * Marks the follower as active. This should be called when some activity has occurred for the follower. + */ + public void markFollowerActive() { + if (stopwatch.isRunning()) { + stopwatch.reset(); + } + stopwatch.start(); + } + + /** + * Marks the follower as inactive. This should only be called from unit tests. + */ + @VisibleForTesting + public void markFollowerInActive() { + if (stopwatch.isRunning()) { + stopwatch.stop(); + } + } + + /** + * Returns the time since the last activity occurred for the follower. + * + * @return time in nanoseconds since the last activity from the follower. + */ + public long nanosSinceLastActivity() { + return stopwatch.elapsed(TimeUnit.NANOSECONDS); + } /** - * Increment the value of the nextIndex - * @return + * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid + * sending duplicate message too frequently if the last replicate message was sent and no reply has been received + * yet within the current heart beat interval + * + * @param commitIndex current commitIndex + * @return true if it is OK to replicate, false otherwise */ - public long incrNextIndex(); + public boolean okToReplicate(final long commitIndex) { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + + // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes + // also our commitIndex, as followers need to be told of new commitIndex as soon as possible. + if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex) + && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) + < context.getConfigParams().getHeartBeatInterval().toMillis()) { + return false; + } + + resetLastReplicated(); + return true; + } + + private void resetLastReplicated() { + lastReplicatedIndex = getNextIndex(); + if (lastReplicatedStopwatch.isRunning()) { + lastReplicatedStopwatch.reset(); + } + lastReplicatedStopwatch.start(); + } + + /** + * Returns the log entry payload data version of the follower. + * + * @return the payload data version. + */ + public short getPayloadVersion() { + return payloadVersion; + } /** - * Increment the value of the matchIndex - * @return + * Sets the payload data version of the follower. + * + * @param payloadVersion the payload data version. */ - public long incrMatchIndex(); + public void setPayloadVersion(final short payloadVersion) { + this.payloadVersion = payloadVersion; + } /** - * The identifier of the follower - * This could simply be the url of the remote actor + * Returns the the raft version of the follower. + * + * @return the raft version of the follower. */ - public String getId(); + public short getRaftVersion() { + return raftVersion; + } /** - * for each server, index of the next log entry - * to send to that server (initialized to leader - * last log index + 1) + * Sets the raft version of the follower. + * + * @param raftVersion the raft version. */ - public AtomicLong getNextIndex(); + public void setRaftVersion(final short raftVersion) { + this.raftVersion = raftVersion; + } /** - * for each server, index of highest log entry - * known to be replicated on server - * (initialized to 0, increases monotonically) + * Returns the LeaderInstallSnapshotState for the in progress install snapshot. + * + * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise. */ - public AtomicLong getMatchIndex(); + public @Nullable LeaderInstallSnapshotState getInstallSnapshotState() { + return installSnapshotState; + } + + /** + * Sets the LeaderInstallSnapshotState when an install snapshot is initiated. + * + * @param state the LeaderInstallSnapshotState + */ + public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) { + if (this.installSnapshotState == null) { + this.installSnapshotState = requireNonNull(state); + } + } + + /** + * Clears the LeaderInstallSnapshotState when an install snapshot is complete. + */ + public void clearLeaderInstallSnapshotState() { + checkState(installSnapshotState != null); + installSnapshotState.close(); + installSnapshotState = null; + } + + /** + * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus + * needs to be sliced into smaller chunks. + * + * @param index the log entry index or NO_INDEX to clear it + */ + public void setSlicedLogEntryIndex(final long index) { + slicedLogEntryIndex = index; + } + + /** + * Return whether or not log entry slicing is currently in progress. + * + * @return true if slicing is currently in progress, false otherwise + */ + public boolean isLogEntrySlicingInProgress() { + return slicedLogEntryIndex != NO_INDEX; + } + + public void setNeedsLeaderAddress(final boolean value) { + needsLeaderAddress = value; + } + + public @Nullable String needsLeaderAddress(final String leaderId) { + return needsLeaderAddress ? context.getPeerAddress(leaderId) : null; + } + + public boolean hasStaleCommitIndex(final long commitIndex) { + return sentCommitIndex != commitIndex; + } + public void setSentCommitIndex(final long commitIndex) { + sentCommitIndex = commitIndex; + } + @Override + public String toString() { + return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex + + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + } }