+ @SuppressWarnings("checkstyle:hiddenField")
+ public boolean setMatchIndex(final long matchIndex) {
+ // If the new match index is the index of the entry currently being sliced, then we know slicing is complete
+ // and the follower received the entry and responded so clear the slicedLogEntryIndex
+ if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) {
+ slicedLogEntryIndex = NO_INDEX;
+ }
+
+ if (this.matchIndex != matchIndex) {
+ this.matchIndex = matchIndex;
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns the identifier of the follower.
+ *
+ * @return the identifier of the follower.
+ */
+ public String getId() {
+ return peerInfo.getId();
+ }
+
+ /**
+ * Returns the index of the next log entry to send to the follower.
+ *
+ * @return index of the follower's next log entry.
+ */
+ public long getNextIndex() {
+ return nextIndex;
+ }
+
+ /**
+ * Returns the index of highest log entry known to be replicated on the follower.
+ *
+ * @return the index of highest log entry.
+ */
+ public long getMatchIndex() {
+ return matchIndex;
+ }
+
+ /**
+ * Checks if the follower is active by comparing the time of the last activity with the election time out. The
+ * follower is active if some activity has occurred for the follower within the election time out interval.
+ *
+ * @return true if follower is active, false otherwise.
+ */
+ public boolean isFollowerActive() {
+ if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+ return false;
+ }
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ return stopwatch.isRunning()
+ && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis();
+ }
+
+ /**
+ * Marks the follower as active. This should be called when some activity has occurred for the follower.
+ */
+ public void markFollowerActive() {
+ if (stopwatch.isRunning()) {
+ stopwatch.reset();
+ }
+ stopwatch.start();
+ }
+
+ /**
+ * Marks the follower as inactive. This should only be called from unit tests.
+ */
+ @VisibleForTesting
+ public void markFollowerInActive() {
+ if (stopwatch.isRunning()) {
+ stopwatch.stop();
+ }
+ }
+
+ /**
+ * Returns the time since the last activity occurred for the follower.
+ *
+ * @return time in nanoseconds since the last activity from the follower.
+ */
+ public long nanosSinceLastActivity() {
+ return stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid
+ * sending duplicate message too frequently if the last replicate message was sent and no reply has been received
+ * yet within the current heart beat interval
+ *
+ * @param commitIndex current commitIndex
+ * @return true if it is OK to replicate, false otherwise
+ */
+ public boolean okToReplicate(final long commitIndex) {
+ if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+ return false;
+ }
+
+ // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
+ // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
+ if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
+ && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ < context.getConfigParams().getHeartBeatInterval().toMillis()) {
+ return false;
+ }
+
+ resetLastReplicated();
+ return true;
+ }
+
+ private void resetLastReplicated() {
+ lastReplicatedIndex = getNextIndex();
+ if (lastReplicatedStopwatch.isRunning()) {
+ lastReplicatedStopwatch.reset();
+ }
+ lastReplicatedStopwatch.start();
+ }
+
+ /**
+ * Returns the log entry payload data version of the follower.
+ *
+ * @return the payload data version.
+ */
+ public short getPayloadVersion() {
+ return payloadVersion;
+ }
+
+ /**
+ * Sets the payload data version of the follower.
+ *
+ * @param payloadVersion the payload data version.
+ */
+ public void setPayloadVersion(final short payloadVersion) {
+ this.payloadVersion = payloadVersion;
+ }
+
+ /**
+ * Returns the the raft version of the follower.
+ *
+ * @return the raft version of the follower.
+ */
+ public short getRaftVersion() {
+ return raftVersion;
+ }
+
+ /**
+ * Sets the raft version of the follower.
+ *
+ * @param raftVersion the raft version.
+ */
+ public void setRaftVersion(final short raftVersion) {
+ checkArgument(raftVersion >= RaftVersions.FLUORINE_VERSION, "Unexpected version %s", raftVersion);
+ this.raftVersion = raftVersion;
+ }
+
+ /**
+ * Returns the LeaderInstallSnapshotState for the in progress install snapshot.
+ *
+ * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise.
+ */
+ public @Nullable LeaderInstallSnapshotState getInstallSnapshotState() {
+ return installSnapshotState;
+ }
+
+ /**
+ * Sets the LeaderInstallSnapshotState when an install snapshot is initiated.
+ *
+ * @param state the LeaderInstallSnapshotState
+ */
+ public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) {
+ if (installSnapshotState == null) {
+ installSnapshotState = requireNonNull(state);
+ }
+ }
+
+ /**
+ * Clears the LeaderInstallSnapshotState when an install snapshot is complete.
+ */
+ public void clearLeaderInstallSnapshotState() {
+ checkState(installSnapshotState != null);
+ installSnapshotState.close();
+ installSnapshotState = null;
+ }
+
+ /**
+ * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus
+ * needs to be sliced into smaller chunks.
+ *
+ * @param index the log entry index or NO_INDEX to clear it
+ */
+ public void setSlicedLogEntryIndex(final long index) {
+ slicedLogEntryIndex = index;
+ }
+
+ /**
+ * Return whether or not log entry slicing is currently in progress.
+ *
+ * @return true if slicing is currently in progress, false otherwise
+ */
+ public boolean isLogEntrySlicingInProgress() {
+ return slicedLogEntryIndex != NO_INDEX;
+ }
+
+ public void setNeedsLeaderAddress(final boolean value) {
+ needsLeaderAddress = value;
+ }
+
+ public @Nullable String needsLeaderAddress(final String leaderId) {
+ return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
+ }
+
+ public boolean hasStaleCommitIndex(final long commitIndex) {
+ return sentCommitIndex != commitIndex;
+ }