Send commitIndex updates to followers as soon as possible
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / FollowerLogInformation.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import java.util.concurrent.TimeUnit;
15 import javax.annotation.Nonnull;
16 import javax.annotation.Nullable;
17 import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
18
19 /**
20  * The state of the followers log as known by the Leader.
21  *
22  * @author Moiz Raja
23  * @author Thomas Pantelis
24  */
25 public final class FollowerLogInformation {
26     public static final long NO_INDEX = -1;
27
28     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
29
30     private final RaftActorContext context;
31
32     private long nextIndex;
33
34     private long matchIndex;
35
36     private long lastReplicatedIndex = -1L;
37
38     private long sentCommitIndex = -1L;
39
40     private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
41
42     private short payloadVersion = -1;
43
44     // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's
45     // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron
46     // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is
47     // HELIUM_VERSION.
48     private short raftVersion = RaftVersions.HELIUM_VERSION;
49
50     private final PeerInfo peerInfo;
51
52     private LeaderInstallSnapshotState installSnapshotState;
53
54     private long slicedLogEntryIndex = NO_INDEX;
55
56     private boolean needsLeaderAddress;
57
58     /**
59      * Constructs an instance.
60      *
61      * @param peerInfo the associated PeerInfo of the follower.
62      * @param matchIndex the initial match index.
63      * @param context the RaftActorContext.
64      */
65     @VisibleForTesting
66     FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) {
67         this.nextIndex = context.getCommitIndex();
68         this.matchIndex = matchIndex;
69         this.context = context;
70         this.peerInfo = Preconditions.checkNotNull(peerInfo);
71     }
72
73     /**
74      * Constructs an instance with no matching index.
75      *
76      * @param peerInfo the associated PeerInfo of the follower.
77      * @param context the RaftActorContext.
78      */
79     public FollowerLogInformation(final PeerInfo peerInfo, final RaftActorContext context) {
80         this(peerInfo, NO_INDEX, context);
81     }
82
83     /**
84      * Increments the value of the follower's next index.
85      *
86      * @return the new value of nextIndex.
87      */
88     @VisibleForTesting
89     long incrNextIndex() {
90         return nextIndex++;
91     }
92
93     /**
94      * Decrements the value of the follower's next index, taking into account its reported last log index.
95      *
96      * @param followerLastIndex follower's last reported index.
97      * @return true if the next index was decremented, i.e. it was previously >= 0, false otherwise.
98      */
99     public boolean decrNextIndex(final long followerLastIndex) {
100         if (nextIndex < 0) {
101             return false;
102         }
103
104         if (followerLastIndex >= 0 && nextIndex > followerLastIndex) {
105             // If the follower's last log index is lower than nextIndex, jump directly to it, so we converge
106             // on a common index more quickly.
107             nextIndex = followerLastIndex;
108         } else {
109             nextIndex--;
110         }
111         return true;
112     }
113
114     /**
115      * Sets the index of the follower's next log entry.
116      *
117      * @param nextIndex the new index.
118      * @return true if the new index differed from the current index and the current index was updated, false
119      *              otherwise.
120      */
121     @SuppressWarnings("checkstyle:hiddenField")
122     public boolean setNextIndex(final long nextIndex) {
123         if (this.nextIndex != nextIndex) {
124             this.nextIndex = nextIndex;
125             return true;
126         }
127
128         return false;
129     }
130
131     /**
132      * Increments the value of the follower's match index.
133      *
134      * @return the new value of matchIndex.
135      */
136     public long incrMatchIndex() {
137         return matchIndex++;
138     }
139
140     /**
141      * Sets the index of the follower's highest log entry.
142      *
143      * @param matchIndex the new index.
144      * @return true if the new index differed from the current index and the current index was updated, false
145      *              otherwise.
146      */
147     @SuppressWarnings("checkstyle:hiddenField")
148     public boolean setMatchIndex(final long matchIndex) {
149         // If the new match index is the index of the entry currently being sliced, then we know slicing is complete
150         // and the follower received the entry and responded so clear the slicedLogEntryIndex
151         if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) {
152             slicedLogEntryIndex = NO_INDEX;
153         }
154
155         if (this.matchIndex != matchIndex) {
156             this.matchIndex = matchIndex;
157             return true;
158         }
159
160         return false;
161     }
162
163     /**
164      * Returns the identifier of the follower.
165      *
166      * @return the identifier of the follower.
167      */
168     public String getId() {
169         return peerInfo.getId();
170     }
171
172     /**
173      * Returns the index of the next log entry to send to the follower.
174      *
175      * @return index of the follower's next log entry.
176      */
177     public long getNextIndex() {
178         return nextIndex;
179     }
180
181     /**
182      * Returns the index of highest log entry known to be replicated on the follower.
183      *
184      * @return the index of highest log entry.
185      */
186     public long getMatchIndex() {
187         return matchIndex;
188     }
189
190     /**
191      * Checks if the follower is active by comparing the time of the last activity with the election time out. The
192      * follower is active if some activity has occurred for the follower within the election time out interval.
193      *
194      * @return true if follower is active, false otherwise.
195      */
196     public boolean isFollowerActive() {
197         if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
198             return false;
199         }
200
201         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
202         return stopwatch.isRunning()
203                 && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis();
204     }
205
206     /**
207      * Marks the follower as active. This should be called when some activity has occurred for the follower.
208      */
209     public void markFollowerActive() {
210         if (stopwatch.isRunning()) {
211             stopwatch.reset();
212         }
213         stopwatch.start();
214     }
215
216     /**
217      * Marks the follower as inactive. This should only be called from unit tests.
218      */
219     @VisibleForTesting
220     public void markFollowerInActive() {
221         if (stopwatch.isRunning()) {
222             stopwatch.stop();
223         }
224     }
225
226     /**
227      * Returns the time since the last activity occurred for the follower.
228      *
229      * @return time in nanoseconds since the last activity from the follower.
230      */
231     public long nanosSinceLastActivity() {
232         return stopwatch.elapsed(TimeUnit.NANOSECONDS);
233     }
234
235     /**
236      * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid
237      * sending duplicate message too frequently if the last replicate message was sent and no reply has been received
238      * yet within the current heart beat interval
239      *
240      * @param commitIndex current commitIndex
241      * @return true if it is OK to replicate, false otherwise
242      */
243     public boolean okToReplicate(final long commitIndex) {
244         if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
245             return false;
246         }
247
248         // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
249         // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
250         if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
251                 && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
252                 < context.getConfigParams().getHeartBeatInterval().toMillis()) {
253             return false;
254         }
255
256         resetLastReplicated();
257         return true;
258     }
259
260     private void resetLastReplicated() {
261         lastReplicatedIndex = getNextIndex();
262         if (lastReplicatedStopwatch.isRunning()) {
263             lastReplicatedStopwatch.reset();
264         }
265         lastReplicatedStopwatch.start();
266     }
267
268     /**
269      * Returns the log entry payload data version of the follower.
270      *
271      * @return the payload data version.
272      */
273     public short getPayloadVersion() {
274         return payloadVersion;
275     }
276
277     /**
278      * Sets the payload data version of the follower.
279      *
280      * @param payloadVersion the payload data version.
281      */
282     public void setPayloadVersion(final short payloadVersion) {
283         this.payloadVersion = payloadVersion;
284     }
285
286     /**
287      * Returns the the raft version of the follower.
288      *
289      * @return the raft version of the follower.
290      */
291     public short getRaftVersion() {
292         return raftVersion;
293     }
294
295     /**
296      * Sets the raft version of the follower.
297      *
298      * @param raftVersion the raft version.
299      */
300     public void setRaftVersion(final short raftVersion) {
301         this.raftVersion = raftVersion;
302     }
303
304     /**
305      * Returns the LeaderInstallSnapshotState for the in progress install snapshot.
306      *
307      * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise.
308      */
309     @Nullable
310     public LeaderInstallSnapshotState getInstallSnapshotState() {
311         return installSnapshotState;
312     }
313
314     /**
315      * Sets the LeaderInstallSnapshotState when an install snapshot is initiated.
316      *
317      * @param state the LeaderInstallSnapshotState
318      */
319     public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) {
320         if (this.installSnapshotState == null) {
321             this.installSnapshotState = Preconditions.checkNotNull(state);
322         }
323     }
324
325     /**
326      * Clears the LeaderInstallSnapshotState when an install snapshot is complete.
327      */
328     public void clearLeaderInstallSnapshotState() {
329         Preconditions.checkState(installSnapshotState != null);
330         installSnapshotState.close();
331         installSnapshotState = null;
332     }
333
334     /**
335      * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus
336      * needs to be sliced into smaller chunks.
337      *
338      * @param index the log entry index or NO_INDEX to clear it
339      */
340     public void setSlicedLogEntryIndex(final long index) {
341         slicedLogEntryIndex  = index;
342     }
343
344     /**
345      * Return whether or not log entry slicing is currently in progress.
346      *
347      * @return true if slicing is currently in progress, false otherwise
348      */
349     public boolean isLogEntrySlicingInProgress() {
350         return slicedLogEntryIndex != NO_INDEX;
351     }
352
353     public void setNeedsLeaderAddress(final boolean value) {
354         needsLeaderAddress = value;
355     }
356
357     public boolean hasStaleCommitIndex(final long commitIndex) {
358         return sentCommitIndex != commitIndex;
359     }
360
361     public void setSentCommitIndex(final long commitIndex) {
362         sentCommitIndex = commitIndex;
363     }
364
365     @Nullable
366     public String needsLeaderAddress(final String leaderId) {
367         return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
368     }
369
370     @Override
371     public String toString() {
372         return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
373                 + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex
374                 + ", votingState=" + peerInfo.getVotingState()
375                 + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
376                 + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
377     }
378 }