X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FFollowerLogInformation.java;h=3952b386b2498a6e2fecf56d6f5a1cb3f4dda8fc;hp=07b6b617aaa862b472fb479be247479e1096f434;hb=1b1360ac337d23b9a586f62616eb278c3065eef0;hpb=4349b034606957d3e876b82b14a292e6739a986a diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 07b6b617aa..3952b386b2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -5,96 +5,342 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ + package org.opendaylight.controller.cluster.raft; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; + /** - * The state of the followers log as known by the Leader + * The state of the followers log as known by the Leader. + * + * @author Moiz Raja + * @author Thomas Pantelis */ -public interface FollowerLogInformation { +public final class FollowerLogInformation { + public static final long NO_INDEX = -1; + + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + + private final RaftActorContext context; + + private long nextIndex; + + private long matchIndex; + + private long lastReplicatedIndex = -1L; + + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); + + private short payloadVersion = -1; + + // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's + // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron + // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is + // HELIUM_VERSION. + private short raftVersion = RaftVersions.HELIUM_VERSION; + + private final PeerInfo peerInfo; + + private LeaderInstallSnapshotState installSnapshotState; + + private long slicedLogEntryIndex = NO_INDEX; /** - * Increment the value of the nextIndex - * @return + * Constructs an instance. + * + * @param peerInfo the associated PeerInfo of the follower. + * @param matchIndex the initial match index. + * @param context the RaftActorContext. */ - long incrNextIndex(); + @VisibleForTesting + FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) { + this.nextIndex = context.getCommitIndex(); + this.matchIndex = matchIndex; + this.context = context; + this.peerInfo = Preconditions.checkNotNull(peerInfo); + } /** - * Decrement the value of the nextIndex - * @return + * Constructs an instance with no matching index. + * + * @param peerInfo the associated PeerInfo of the follower. + * @param context the RaftActorContext. */ - long decrNextIndex(); + public FollowerLogInformation(final PeerInfo peerInfo, final RaftActorContext context) { + this(peerInfo, NO_INDEX, context); + } /** - * Sets the index of the next log entry for this follower. + * Increments the value of the follower's next index. * - * @param nextIndex + * @return the new value of nextIndex. + */ + @VisibleForTesting + long incrNextIndex() { + return nextIndex++; + } + + /** + * Decrements the value of the follower's next index. + * + * @return true if the next index was decremented, ie it was previously >= 0, false otherwise. + */ + public boolean decrNextIndex() { + if (nextIndex < 0) { + return false; + } + + nextIndex--; + return true; + } + + /** + * Sets the index of the follower's next log entry. + * + * @param nextIndex the new index. * @return true if the new index differed from the current index and the current index was updated, false * otherwise. */ - boolean setNextIndex(long nextIndex); + @SuppressWarnings("checkstyle:hiddenField") + public boolean setNextIndex(final long nextIndex) { + if (this.nextIndex != nextIndex) { + this.nextIndex = nextIndex; + return true; + } + + return false; + } /** - * Increment the value of the matchIndex - * @return + * Increments the value of the follower's match index. + * + * @return the new value of matchIndex. */ - long incrMatchIndex(); + public long incrMatchIndex() { + return matchIndex++; + } /** - * Sets the index of the highest log entry for this follower. + * Sets the index of the follower's highest log entry. * - * @param matchIndex + * @param matchIndex the new index. * @return true if the new index differed from the current index and the current index was updated, false * otherwise. */ - boolean setMatchIndex(long matchIndex); + @SuppressWarnings("checkstyle:hiddenField") + public boolean setMatchIndex(final long matchIndex) { + // If the new match index is the index of the entry currently being sliced, then we know slicing is complete + // and the follower received the entry and responded so clear the slicedLogEntryIndex + if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) { + slicedLogEntryIndex = NO_INDEX; + } + + if (this.matchIndex != matchIndex) { + this.matchIndex = matchIndex; + return true; + } + + return false; + } /** - * The identifier of the follower - * This could simply be the url of the remote actor + * Returns the identifier of the follower. + * + * @return the identifier of the follower. */ - String getId(); + public String getId() { + return peerInfo.getId(); + } /** - * for each server, index of the next log entry - * to send to that server (initialized to leader - * last log index + 1) + * Returns the index of the next log entry to send to the follower. + * + * @return index of the follower's next log entry. */ - long getNextIndex(); + public long getNextIndex() { + return nextIndex; + } /** - * for each server, index of highest log entry - * known to be replicated on server - * (initialized to 0, increases monotonically) + * Returns the index of highest log entry known to be replicated on the follower. + * + * @return the index of highest log entry. */ - long getMatchIndex(); + public long getMatchIndex() { + return matchIndex; + } /** - * Checks if the follower is active by comparing the last updated with the duration - * @return boolean + * Checks if the follower is active by comparing the time of the last activity with the election time out. The + * follower is active if some activity has occurred for the follower within the election time out interval. + * + * @return true if follower is active, false otherwise. */ - boolean isFollowerActive(); + public boolean isFollowerActive() { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + return stopwatch.isRunning() + && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis(); + } /** - * restarts the timeout clock of the follower + * Marks the follower as active. This should be called when some activity has occurred for the follower. */ - void markFollowerActive(); + public void markFollowerActive() { + if (stopwatch.isRunning()) { + stopwatch.reset(); + } + stopwatch.start(); + } /** - * This will stop the timeout clock + * Marks the follower as inactive. This should only be called from unit tests. */ - void markFollowerInActive(); + @VisibleForTesting + public void markFollowerInActive() { + if (stopwatch.isRunning()) { + stopwatch.stop(); + } + } + /** + * Returns the time since the last activity occurred for the follower. + * + * @return time in nanoseconds since the last activity from the follower. + */ + public long nanosSinceLastActivity() { + return stopwatch.elapsed(TimeUnit.NANOSECONDS); + } /** - * This will return the active time of follower, since it was last reset - * @return time in milliseconds + * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid + * sending duplicate message too frequently if the last replicate message was sent and no reply has been received + * yet within the current heart beat interval + * + * @return true if it is OK to replicate, false otherwise + */ + public boolean okToReplicate() { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + + // Return false if we are trying to send duplicate data before the heartbeat interval + if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) + < context.getConfigParams().getHeartBeatInterval().toMillis()) { + return false; + } + + resetLastReplicated(); + return true; + } + + private void resetLastReplicated() { + lastReplicatedIndex = getNextIndex(); + if (lastReplicatedStopwatch.isRunning()) { + lastReplicatedStopwatch.reset(); + } + lastReplicatedStopwatch.start(); + } + + /** + * Returns the log entry payload data version of the follower. + * + * @return the payload data version. + */ + public short getPayloadVersion() { + return payloadVersion; + } + + /** + * Sets the payload data version of the follower. + * + * @param payloadVersion the payload data version. + */ + public void setPayloadVersion(final short payloadVersion) { + this.payloadVersion = payloadVersion; + } + + /** + * Returns the the raft version of the follower. + * + * @return the raft version of the follower. + */ + public short getRaftVersion() { + return raftVersion; + } + + /** + * Sets the raft version of the follower. + * + * @param raftVersion the raft version. + */ + public void setRaftVersion(final short raftVersion) { + this.raftVersion = raftVersion; + } + + /** + * Returns the LeaderInstallSnapshotState for the in progress install snapshot. + * + * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise. + */ + @Nullable + public LeaderInstallSnapshotState getInstallSnapshotState() { + return installSnapshotState; + } + + /** + * Sets the LeaderInstallSnapshotState when an install snapshot is initiated. + * + * @param state the LeaderInstallSnapshotState */ - long timeSinceLastActivity(); + public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) { + if (this.installSnapshotState == null) { + this.installSnapshotState = Preconditions.checkNotNull(state); + } + } /** - * This method checks if it is ok to replicate + * Clears the LeaderInstallSnapshotState when an install snapshot is complete. + */ + public void clearLeaderInstallSnapshotState() { + Preconditions.checkState(installSnapshotState != null); + installSnapshotState.close(); + installSnapshotState = null; + } + + /** + * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus + * needs to be sliced into smaller chunks. * - * @return true if it is ok to replicate + * @param index the log entry index or NO_INDEX to clear it */ - boolean okToReplicate(); + public void setSlicedLogEntryIndex(final long index) { + slicedLogEntryIndex = index; + } + + /** + * Return whether or not log entry slicing is currently in progress. + * + * @return true if slicing is currently in progress, false otherwise + */ + public boolean isLogEntrySlicingInProgress() { + return slicedLogEntryIndex != NO_INDEX; + } + + @Override + public String toString() { + return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" + + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + } }