2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Stopwatch;
16 import java.util.concurrent.TimeUnit;
17 import org.eclipse.jdt.annotation.NonNull;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
22 * The state of the followers log as known by the Leader.
25 * @author Thomas Pantelis
27 public final class FollowerLogInformation {
28 public static final long NO_INDEX = -1;
30 private final Stopwatch stopwatch = Stopwatch.createUnstarted();
32 private final RaftActorContext context;
34 private long nextIndex;
36 private long matchIndex;
38 private long lastReplicatedIndex = -1L;
40 private long sentCommitIndex = -1L;
42 private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
44 private short payloadVersion = -1;
46 // Assume the FLUORINE_VERSION version initially, as we no longer support pre-Fluorine versions.
47 private short raftVersion = RaftVersions.FLUORINE_VERSION;
49 private final PeerInfo peerInfo;
51 private LeaderInstallSnapshotState installSnapshotState;
53 private long slicedLogEntryIndex = NO_INDEX;
55 private boolean needsLeaderAddress;
58 * Constructs an instance.
60 * @param peerInfo the associated PeerInfo of the follower.
61 * @param matchIndex the initial match index.
62 * @param context the RaftActorContext.
65 FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) {
66 nextIndex = context.getCommitIndex();
67 this.matchIndex = matchIndex;
68 this.context = context;
69 this.peerInfo = requireNonNull(peerInfo);
73 * Constructs an instance with no matching index.
75 * @param peerInfo the associated PeerInfo of the follower.
76 * @param context the RaftActorContext.
78 public FollowerLogInformation(final PeerInfo peerInfo, final RaftActorContext context) {
79 this(peerInfo, NO_INDEX, context);
83 * Increments the value of the follower's next index.
85 * @return the new value of nextIndex.
88 long incrNextIndex() {
93 * Decrements the value of the follower's next index, taking into account its reported last log index.
95 * @param followerLastIndex follower's last reported index.
96 * @return true if the next index was decremented, i.e. it was previously >= 0, false otherwise.
98 public boolean decrNextIndex(final long followerLastIndex) {
103 if (followerLastIndex >= 0 && nextIndex > followerLastIndex) {
104 // If the follower's last log index is lower than nextIndex, jump directly to it, so we converge
105 // on a common index more quickly.
106 nextIndex = followerLastIndex;
114 * Sets the index of the follower's next log entry.
116 * @param nextIndex the new index.
117 * @return true if the new index differed from the current index and the current index was updated, false
120 @SuppressWarnings("checkstyle:hiddenField")
121 public boolean setNextIndex(final long nextIndex) {
122 if (this.nextIndex != nextIndex) {
123 this.nextIndex = nextIndex;
131 * Increments the value of the follower's match index.
133 * @return the new value of matchIndex.
135 public long incrMatchIndex() {
140 * Sets the index of the follower's highest log entry.
142 * @param matchIndex the new index.
143 * @return true if the new index differed from the current index and the current index was updated, false
146 @SuppressWarnings("checkstyle:hiddenField")
147 public boolean setMatchIndex(final long matchIndex) {
148 // If the new match index is the index of the entry currently being sliced, then we know slicing is complete
149 // and the follower received the entry and responded so clear the slicedLogEntryIndex
150 if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) {
151 slicedLogEntryIndex = NO_INDEX;
154 if (this.matchIndex != matchIndex) {
155 this.matchIndex = matchIndex;
163 * Returns the identifier of the follower.
165 * @return the identifier of the follower.
167 public String getId() {
168 return peerInfo.getId();
172 * Returns the index of the next log entry to send to the follower.
174 * @return index of the follower's next log entry.
176 public long getNextIndex() {
181 * Returns the index of highest log entry known to be replicated on the follower.
183 * @return the index of highest log entry.
185 public long getMatchIndex() {
190 * Checks if the follower is active by comparing the time of the last activity with the election time out. The
191 * follower is active if some activity has occurred for the follower within the election time out interval.
193 * @return true if follower is active, false otherwise.
195 public boolean isFollowerActive() {
196 if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
200 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
201 return stopwatch.isRunning()
202 && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis();
206 * Marks the follower as active. This should be called when some activity has occurred for the follower.
208 public void markFollowerActive() {
209 if (stopwatch.isRunning()) {
216 * Marks the follower as inactive. This should only be called from unit tests.
219 public void markFollowerInActive() {
220 if (stopwatch.isRunning()) {
226 * Returns the time since the last activity occurred for the follower.
228 * @return time in nanoseconds since the last activity from the follower.
230 public long nanosSinceLastActivity() {
231 return stopwatch.elapsed(TimeUnit.NANOSECONDS);
235 * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid
236 * sending duplicate message too frequently if the last replicate message was sent and no reply has been received
237 * yet within the current heart beat interval
239 * @param commitIndex current commitIndex
240 * @return true if it is OK to replicate, false otherwise
242 public boolean okToReplicate(final long commitIndex) {
243 if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
247 // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
248 // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
249 if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
250 && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
251 < context.getConfigParams().getHeartBeatInterval().toMillis()) {
255 resetLastReplicated();
259 private void resetLastReplicated() {
260 lastReplicatedIndex = getNextIndex();
261 if (lastReplicatedStopwatch.isRunning()) {
262 lastReplicatedStopwatch.reset();
264 lastReplicatedStopwatch.start();
268 * Returns the log entry payload data version of the follower.
270 * @return the payload data version.
272 public short getPayloadVersion() {
273 return payloadVersion;
277 * Sets the payload data version of the follower.
279 * @param payloadVersion the payload data version.
281 public void setPayloadVersion(final short payloadVersion) {
282 this.payloadVersion = payloadVersion;
286 * Returns the the raft version of the follower.
288 * @return the raft version of the follower.
290 public short getRaftVersion() {
295 * Sets the raft version of the follower.
297 * @param raftVersion the raft version.
299 public void setRaftVersion(final short raftVersion) {
300 checkArgument(raftVersion >= RaftVersions.FLUORINE_VERSION, "Unexpected version %s", raftVersion);
301 this.raftVersion = raftVersion;
305 * Returns the LeaderInstallSnapshotState for the in progress install snapshot.
307 * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise.
309 public @Nullable LeaderInstallSnapshotState getInstallSnapshotState() {
310 return installSnapshotState;
314 * Sets the LeaderInstallSnapshotState when an install snapshot is initiated.
316 * @param state the LeaderInstallSnapshotState
318 public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) {
319 if (installSnapshotState == null) {
320 installSnapshotState = requireNonNull(state);
325 * Clears the LeaderInstallSnapshotState when an install snapshot is complete.
327 public void clearLeaderInstallSnapshotState() {
328 checkState(installSnapshotState != null);
329 installSnapshotState.close();
330 installSnapshotState = null;
334 * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus
335 * needs to be sliced into smaller chunks.
337 * @param index the log entry index or NO_INDEX to clear it
339 public void setSlicedLogEntryIndex(final long index) {
340 slicedLogEntryIndex = index;
344 * Return whether or not log entry slicing is currently in progress.
346 * @return true if slicing is currently in progress, false otherwise
348 public boolean isLogEntrySlicingInProgress() {
349 return slicedLogEntryIndex != NO_INDEX;
352 public void setNeedsLeaderAddress(final boolean value) {
353 needsLeaderAddress = value;
356 public @Nullable String needsLeaderAddress(final String leaderId) {
357 return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
360 public boolean hasStaleCommitIndex(final long commitIndex) {
361 return sentCommitIndex != commitIndex;
364 public void setSentCommitIndex(final long commitIndex) {
365 sentCommitIndex = commitIndex;
369 public String toString() {
370 return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
371 + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex
372 + ", votingState=" + peerInfo.getVotingState()
373 + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
374 + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";