Improve follower term conflict resolution
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / FollowerLogInformation.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import java.util.concurrent.TimeUnit;
15 import javax.annotation.Nonnull;
16 import javax.annotation.Nullable;
17 import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
18
19 /**
20  * The state of the followers log as known by the Leader.
21  *
22  * @author Moiz Raja
23  * @author Thomas Pantelis
24  */
25 public final class FollowerLogInformation {
26     public static final long NO_INDEX = -1;
27
28     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
29
30     private final RaftActorContext context;
31
32     private long nextIndex;
33
34     private long matchIndex;
35
36     private long lastReplicatedIndex = -1L;
37
38     private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
39
40     private short payloadVersion = -1;
41
42     // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's
43     // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron
44     // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is
45     // HELIUM_VERSION.
46     private short raftVersion = RaftVersions.HELIUM_VERSION;
47
48     private final PeerInfo peerInfo;
49
50     private LeaderInstallSnapshotState installSnapshotState;
51
52     private long slicedLogEntryIndex = NO_INDEX;
53
54     private boolean needsLeaderAddress;
55
56     /**
57      * Constructs an instance.
58      *
59      * @param peerInfo the associated PeerInfo of the follower.
60      * @param matchIndex the initial match index.
61      * @param context the RaftActorContext.
62      */
63     @VisibleForTesting
64     FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) {
65         this.nextIndex = context.getCommitIndex();
66         this.matchIndex = matchIndex;
67         this.context = context;
68         this.peerInfo = Preconditions.checkNotNull(peerInfo);
69     }
70
71     /**
72      * Constructs an instance with no matching index.
73      *
74      * @param peerInfo the associated PeerInfo of the follower.
75      * @param context the RaftActorContext.
76      */
77     public FollowerLogInformation(final PeerInfo peerInfo, final RaftActorContext context) {
78         this(peerInfo, NO_INDEX, context);
79     }
80
81     /**
82      * Increments the value of the follower's next index.
83      *
84      * @return the new value of nextIndex.
85      */
86     @VisibleForTesting
87     long incrNextIndex() {
88         return nextIndex++;
89     }
90
91     /**
92      * Decrements the value of the follower's next index, taking into account its reported last log index.
93      *
94      * @param followerLastIndex follower's last reported index.
95      * @return true if the next index was decremented, i.e. it was previously >= 0, false otherwise.
96      */
97     public boolean decrNextIndex(final long followerLastIndex) {
98         if (nextIndex < 0) {
99             return false;
100         }
101
102         if (followerLastIndex >= 0 && nextIndex > followerLastIndex) {
103             // If the follower's last log index is lower than nextIndex, jump directly to it, so we converge
104             // on a common index more quickly.
105             nextIndex = followerLastIndex;
106         } else {
107             nextIndex--;
108         }
109         return true;
110     }
111
112     /**
113      * Sets the index of the follower's next log entry.
114      *
115      * @param nextIndex the new index.
116      * @return true if the new index differed from the current index and the current index was updated, false
117      *              otherwise.
118      */
119     @SuppressWarnings("checkstyle:hiddenField")
120     public boolean setNextIndex(final long nextIndex) {
121         if (this.nextIndex != nextIndex) {
122             this.nextIndex = nextIndex;
123             return true;
124         }
125
126         return false;
127     }
128
129     /**
130      * Increments the value of the follower's match index.
131      *
132      * @return the new value of matchIndex.
133      */
134     public long incrMatchIndex() {
135         return matchIndex++;
136     }
137
138     /**
139      * Sets the index of the follower's highest log entry.
140      *
141      * @param matchIndex the new index.
142      * @return true if the new index differed from the current index and the current index was updated, false
143      *              otherwise.
144      */
145     @SuppressWarnings("checkstyle:hiddenField")
146     public boolean setMatchIndex(final long matchIndex) {
147         // If the new match index is the index of the entry currently being sliced, then we know slicing is complete
148         // and the follower received the entry and responded so clear the slicedLogEntryIndex
149         if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) {
150             slicedLogEntryIndex = NO_INDEX;
151         }
152
153         if (this.matchIndex != matchIndex) {
154             this.matchIndex = matchIndex;
155             return true;
156         }
157
158         return false;
159     }
160
161     /**
162      * Returns the identifier of the follower.
163      *
164      * @return the identifier of the follower.
165      */
166     public String getId() {
167         return peerInfo.getId();
168     }
169
170     /**
171      * Returns the index of the next log entry to send to the follower.
172      *
173      * @return index of the follower's next log entry.
174      */
175     public long getNextIndex() {
176         return nextIndex;
177     }
178
179     /**
180      * Returns the index of highest log entry known to be replicated on the follower.
181      *
182      * @return the index of highest log entry.
183      */
184     public long getMatchIndex() {
185         return matchIndex;
186     }
187
188     /**
189      * Checks if the follower is active by comparing the time of the last activity with the election time out. The
190      * follower is active if some activity has occurred for the follower within the election time out interval.
191      *
192      * @return true if follower is active, false otherwise.
193      */
194     public boolean isFollowerActive() {
195         if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
196             return false;
197         }
198
199         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
200         return stopwatch.isRunning()
201                 && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis();
202     }
203
204     /**
205      * Marks the follower as active. This should be called when some activity has occurred for the follower.
206      */
207     public void markFollowerActive() {
208         if (stopwatch.isRunning()) {
209             stopwatch.reset();
210         }
211         stopwatch.start();
212     }
213
214     /**
215      * Marks the follower as inactive. This should only be called from unit tests.
216      */
217     @VisibleForTesting
218     public void markFollowerInActive() {
219         if (stopwatch.isRunning()) {
220             stopwatch.stop();
221         }
222     }
223
224     /**
225      * Returns the time since the last activity occurred for the follower.
226      *
227      * @return time in nanoseconds since the last activity from the follower.
228      */
229     public long nanosSinceLastActivity() {
230         return stopwatch.elapsed(TimeUnit.NANOSECONDS);
231     }
232
233     /**
234      * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid
235      * sending duplicate message too frequently if the last replicate message was sent and no reply has been received
236      * yet within the current heart beat interval
237      *
238      * @return true if it is OK to replicate, false otherwise
239      */
240     public boolean okToReplicate() {
241         if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
242             return false;
243         }
244
245         // Return false if we are trying to send duplicate data before the heartbeat interval
246         if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
247                 < context.getConfigParams().getHeartBeatInterval().toMillis()) {
248             return false;
249         }
250
251         resetLastReplicated();
252         return true;
253     }
254
255     private void resetLastReplicated() {
256         lastReplicatedIndex = getNextIndex();
257         if (lastReplicatedStopwatch.isRunning()) {
258             lastReplicatedStopwatch.reset();
259         }
260         lastReplicatedStopwatch.start();
261     }
262
263     /**
264      * Returns the log entry payload data version of the follower.
265      *
266      * @return the payload data version.
267      */
268     public short getPayloadVersion() {
269         return payloadVersion;
270     }
271
272     /**
273      * Sets the payload data version of the follower.
274      *
275      * @param payloadVersion the payload data version.
276      */
277     public void setPayloadVersion(final short payloadVersion) {
278         this.payloadVersion = payloadVersion;
279     }
280
281     /**
282      * Returns the the raft version of the follower.
283      *
284      * @return the raft version of the follower.
285      */
286     public short getRaftVersion() {
287         return raftVersion;
288     }
289
290     /**
291      * Sets the raft version of the follower.
292      *
293      * @param raftVersion the raft version.
294      */
295     public void setRaftVersion(final short raftVersion) {
296         this.raftVersion = raftVersion;
297     }
298
299     /**
300      * Returns the LeaderInstallSnapshotState for the in progress install snapshot.
301      *
302      * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise.
303      */
304     @Nullable
305     public LeaderInstallSnapshotState getInstallSnapshotState() {
306         return installSnapshotState;
307     }
308
309     /**
310      * Sets the LeaderInstallSnapshotState when an install snapshot is initiated.
311      *
312      * @param state the LeaderInstallSnapshotState
313      */
314     public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) {
315         if (this.installSnapshotState == null) {
316             this.installSnapshotState = Preconditions.checkNotNull(state);
317         }
318     }
319
320     /**
321      * Clears the LeaderInstallSnapshotState when an install snapshot is complete.
322      */
323     public void clearLeaderInstallSnapshotState() {
324         Preconditions.checkState(installSnapshotState != null);
325         installSnapshotState.close();
326         installSnapshotState = null;
327     }
328
329     /**
330      * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus
331      * needs to be sliced into smaller chunks.
332      *
333      * @param index the log entry index or NO_INDEX to clear it
334      */
335     public void setSlicedLogEntryIndex(final long index) {
336         slicedLogEntryIndex  = index;
337     }
338
339     /**
340      * Return whether or not log entry slicing is currently in progress.
341      *
342      * @return true if slicing is currently in progress, false otherwise
343      */
344     public boolean isLogEntrySlicingInProgress() {
345         return slicedLogEntryIndex != NO_INDEX;
346     }
347
348     public void setNeedsLeaderAddress(boolean value) {
349         needsLeaderAddress = value;
350     }
351
352     @Nullable
353     public String needsLeaderAddress(String leaderId) {
354         return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
355     }
356
357     @Override
358     public String toString() {
359         return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
360                 + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
361                 + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
362                 + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
363     }
364 }