X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FFollowerLogInformation.java;h=3952b386b2498a6e2fecf56d6f5a1cb3f4dda8fc;hp=f3de9835385eb0880bb567402f5bb493f8ad3a76;hb=1b1360ac337d23b9a586f62616eb278c3065eef0;hpb=fe4049d34de103016d11f3a9050853c6380646d3 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index f3de983538..3952b386b2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -8,58 +8,339 @@ package org.opendaylight.controller.cluster.raft; -import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; /** - * The state of the followers log as known by the Leader + * The state of the followers log as known by the Leader. + * + * @author Moiz Raja + * @author Thomas Pantelis */ -public interface FollowerLogInformation { +public final class FollowerLogInformation { + public static final long NO_INDEX = -1; + + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + + private final RaftActorContext context; + + private long nextIndex; + + private long matchIndex; + + private long lastReplicatedIndex = -1L; + + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); + + private short payloadVersion = -1; + + // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's + // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron + // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is + // HELIUM_VERSION. + private short raftVersion = RaftVersions.HELIUM_VERSION; + + private final PeerInfo peerInfo; + + private LeaderInstallSnapshotState installSnapshotState; + + private long slicedLogEntryIndex = NO_INDEX; /** - * Increment the value of the nextIndex - * @return + * Constructs an instance. + * + * @param peerInfo the associated PeerInfo of the follower. + * @param matchIndex the initial match index. + * @param context the RaftActorContext. */ - public long incrNextIndex(); + @VisibleForTesting + FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) { + this.nextIndex = context.getCommitIndex(); + this.matchIndex = matchIndex; + this.context = context; + this.peerInfo = Preconditions.checkNotNull(peerInfo); + } /** - * Decrement the value of the nextIndex - * @return + * Constructs an instance with no matching index. + * + * @param peerInfo the associated PeerInfo of the follower. + * @param context the RaftActorContext. */ - public long decrNextIndex(); + public FollowerLogInformation(final PeerInfo peerInfo, final RaftActorContext context) { + this(peerInfo, NO_INDEX, context); + } /** + * Increments the value of the follower's next index. * - * @param nextIndex + * @return the new value of nextIndex. */ - void setNextIndex(long nextIndex); + @VisibleForTesting + long incrNextIndex() { + return nextIndex++; + } /** - * Increment the value of the matchIndex - * @return + * Decrements the value of the follower's next index. + * + * @return true if the next index was decremented, ie it was previously >= 0, false otherwise. */ - public long incrMatchIndex(); + public boolean decrNextIndex() { + if (nextIndex < 0) { + return false; + } - public void setMatchIndex(long matchIndex); + nextIndex--; + return true; + } /** - * The identifier of the follower - * This could simply be the url of the remote actor + * Sets the index of the follower's next log entry. + * + * @param nextIndex the new index. + * @return true if the new index differed from the current index and the current index was updated, false + * otherwise. */ - public String getId(); + @SuppressWarnings("checkstyle:hiddenField") + public boolean setNextIndex(final long nextIndex) { + if (this.nextIndex != nextIndex) { + this.nextIndex = nextIndex; + return true; + } + + return false; + } /** - * for each server, index of the next log entry - * to send to that server (initialized to leader - * last log index + 1) + * Increments the value of the follower's match index. + * + * @return the new value of matchIndex. */ - public AtomicLong getNextIndex(); + public long incrMatchIndex() { + return matchIndex++; + } /** - * for each server, index of highest log entry - * known to be replicated on server - * (initialized to 0, increases monotonically) + * Sets the index of the follower's highest log entry. + * + * @param matchIndex the new index. + * @return true if the new index differed from the current index and the current index was updated, false + * otherwise. */ - public AtomicLong getMatchIndex(); + @SuppressWarnings("checkstyle:hiddenField") + public boolean setMatchIndex(final long matchIndex) { + // If the new match index is the index of the entry currently being sliced, then we know slicing is complete + // and the follower received the entry and responded so clear the slicedLogEntryIndex + if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) { + slicedLogEntryIndex = NO_INDEX; + } + + if (this.matchIndex != matchIndex) { + this.matchIndex = matchIndex; + return true; + } + + return false; + } + /** + * Returns the identifier of the follower. + * + * @return the identifier of the follower. + */ + public String getId() { + return peerInfo.getId(); + } + + /** + * Returns the index of the next log entry to send to the follower. + * + * @return index of the follower's next log entry. + */ + public long getNextIndex() { + return nextIndex; + } + + /** + * Returns the index of highest log entry known to be replicated on the follower. + * + * @return the index of highest log entry. + */ + public long getMatchIndex() { + return matchIndex; + } + + /** + * Checks if the follower is active by comparing the time of the last activity with the election time out. The + * follower is active if some activity has occurred for the follower within the election time out interval. + * + * @return true if follower is active, false otherwise. + */ + public boolean isFollowerActive() { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + return stopwatch.isRunning() + && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis(); + } + + /** + * Marks the follower as active. This should be called when some activity has occurred for the follower. + */ + public void markFollowerActive() { + if (stopwatch.isRunning()) { + stopwatch.reset(); + } + stopwatch.start(); + } + + /** + * Marks the follower as inactive. This should only be called from unit tests. + */ + @VisibleForTesting + public void markFollowerInActive() { + if (stopwatch.isRunning()) { + stopwatch.stop(); + } + } + + /** + * Returns the time since the last activity occurred for the follower. + * + * @return time in nanoseconds since the last activity from the follower. + */ + public long nanosSinceLastActivity() { + return stopwatch.elapsed(TimeUnit.NANOSECONDS); + } + + /** + * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid + * sending duplicate message too frequently if the last replicate message was sent and no reply has been received + * yet within the current heart beat interval + * + * @return true if it is OK to replicate, false otherwise + */ + public boolean okToReplicate() { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + + // Return false if we are trying to send duplicate data before the heartbeat interval + if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) + < context.getConfigParams().getHeartBeatInterval().toMillis()) { + return false; + } + + resetLastReplicated(); + return true; + } + + private void resetLastReplicated() { + lastReplicatedIndex = getNextIndex(); + if (lastReplicatedStopwatch.isRunning()) { + lastReplicatedStopwatch.reset(); + } + lastReplicatedStopwatch.start(); + } + + /** + * Returns the log entry payload data version of the follower. + * + * @return the payload data version. + */ + public short getPayloadVersion() { + return payloadVersion; + } + + /** + * Sets the payload data version of the follower. + * + * @param payloadVersion the payload data version. + */ + public void setPayloadVersion(final short payloadVersion) { + this.payloadVersion = payloadVersion; + } + + /** + * Returns the the raft version of the follower. + * + * @return the raft version of the follower. + */ + public short getRaftVersion() { + return raftVersion; + } + + /** + * Sets the raft version of the follower. + * + * @param raftVersion the raft version. + */ + public void setRaftVersion(final short raftVersion) { + this.raftVersion = raftVersion; + } + + /** + * Returns the LeaderInstallSnapshotState for the in progress install snapshot. + * + * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise. + */ + @Nullable + public LeaderInstallSnapshotState getInstallSnapshotState() { + return installSnapshotState; + } + + /** + * Sets the LeaderInstallSnapshotState when an install snapshot is initiated. + * + * @param state the LeaderInstallSnapshotState + */ + public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) { + if (this.installSnapshotState == null) { + this.installSnapshotState = Preconditions.checkNotNull(state); + } + } + + /** + * Clears the LeaderInstallSnapshotState when an install snapshot is complete. + */ + public void clearLeaderInstallSnapshotState() { + Preconditions.checkState(installSnapshotState != null); + installSnapshotState.close(); + installSnapshotState = null; + } + + /** + * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus + * needs to be sliced into smaller chunks. + * + * @param index the log entry index or NO_INDEX to clear it + */ + public void setSlicedLogEntryIndex(final long index) { + slicedLogEntryIndex = index; + } + + /** + * Return whether or not log entry slicing is currently in progress. + * + * @return true if slicing is currently in progress, false otherwise + */ + public boolean isLogEntrySlicingInProgress() { + return slicedLogEntryIndex != NO_INDEX; + } + @Override + public String toString() { + return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" + + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + } }