package org.opendaylight.controller.cluster.raft;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
+
+/**
+ * Implementation of the FollowerLogInformation interface.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
+public class FollowerLogInformationImpl implements FollowerLogInformation {
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+
+ private final RaftActorContext context;
+
+ private long nextIndex;
+
+ private long matchIndex;
-public class FollowerLogInformationImpl implements FollowerLogInformation{
+ private long lastReplicatedIndex = -1L;
- private final String id;
+ private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
- private final AtomicLong nextIndex;
+ private short payloadVersion = -1;
- private final AtomicLong matchIndex;
+ // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's
+ // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron
+ // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is
+ // HELIUM_VERSION.
+ private short raftVersion = RaftVersions.HELIUM_VERSION;
- public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
- AtomicLong matchIndex) {
- this.id = id;
- this.nextIndex = nextIndex;
+ private final PeerInfo peerInfo;
+
+ private LeaderInstallSnapshotState installSnapshotState;
+
+ /**
+ * Constructs an instance.
+ *
+ * @param peerInfo the associated PeerInfo of the follower.
+ * @param matchIndex the initial match index.
+ * @param context the RaftActorContext.
+ */
+ public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
+ this.nextIndex = context.getCommitIndex();
this.matchIndex = matchIndex;
+ this.context = context;
+ this.peerInfo = Preconditions.checkNotNull(peerInfo);
}
- public long incrNextIndex(){
- return nextIndex.incrementAndGet();
+ @Override
+ public long incrNextIndex() {
+ return nextIndex++;
}
- @Override public long decrNextIndex() {
- return nextIndex.decrementAndGet();
+ @Override
+ public boolean decrNextIndex() {
+ if (nextIndex >= 0) {
+ nextIndex--;
+ return true;
+ }
+
+ return false;
}
- @Override public void setNextIndex(long nextIndex) {
- this.nextIndex.set(nextIndex);
+ @Override
+ public boolean setNextIndex(long nextIndex) {
+ if (this.nextIndex != nextIndex) {
+ this.nextIndex = nextIndex;
+ return true;
+ }
+
+ return false;
}
- public long incrMatchIndex(){
- return matchIndex.incrementAndGet();
+ @Override
+ public long incrMatchIndex() {
+ return matchIndex++;
}
- @Override public void setMatchIndex(long matchIndex) {
- this.matchIndex.set(matchIndex);
+ @Override
+ public boolean setMatchIndex(long matchIndex) {
+ if (this.matchIndex != matchIndex) {
+ this.matchIndex = matchIndex;
+ return true;
+ }
+
+ return false;
}
+ @Override
public String getId() {
- return id;
+ return peerInfo.getId();
}
- public AtomicLong getNextIndex() {
+ @Override
+ public long getNextIndex() {
return nextIndex;
}
- public AtomicLong getMatchIndex() {
+ @Override
+ public long getMatchIndex() {
return matchIndex;
}
+ @Override
+ public boolean isFollowerActive() {
+ if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+ return false;
+ }
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ return stopwatch.isRunning()
+ && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis();
+ }
+
+ @Override
+ public void markFollowerActive() {
+ if (stopwatch.isRunning()) {
+ stopwatch.reset();
+ }
+ stopwatch.start();
+ }
+
+ @Override
+ public void markFollowerInActive() {
+ if (stopwatch.isRunning()) {
+ stopwatch.stop();
+ }
+ }
+
+ @Override
+ public long timeSinceLastActivity() {
+ return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean okToReplicate() {
+ if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+ return false;
+ }
+
+ // Return false if we are trying to send duplicate data before the heartbeat interval
+ if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ < context.getConfigParams().getHeartBeatInterval().toMillis()) {
+ return false;
+ }
+
+ resetLastReplicated();
+ return true;
+ }
+
+ private void resetLastReplicated() {
+ lastReplicatedIndex = getNextIndex();
+ if (lastReplicatedStopwatch.isRunning()) {
+ lastReplicatedStopwatch.reset();
+ }
+ lastReplicatedStopwatch.start();
+ }
+
+ @Override
+ public short getPayloadVersion() {
+ return payloadVersion;
+ }
+
+ @Override
+ public void setPayloadVersion(short payloadVersion) {
+ this.payloadVersion = payloadVersion;
+ }
+
+ @Override
+ public short getRaftVersion() {
+ return raftVersion;
+ }
+
+ @Override
+ public void setRaftVersion(short raftVersion) {
+ this.raftVersion = raftVersion;
+ }
+
+ @Override
+ @Nullable
+ public LeaderInstallSnapshotState getInstallSnapshotState() {
+ return installSnapshotState;
+ }
+
+ @Override
+ public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) {
+ if (this.installSnapshotState == null) {
+ this.installSnapshotState = Preconditions.checkNotNull(state);
+ }
+ }
+
+ @Override
+ public void clearLeaderInstallSnapshotState() {
+ Preconditions.checkState(installSnapshotState != null);
+ installSnapshotState.close();
+ installSnapshotState = null;
+ }
+
+ @Override
+ public String toString() {
+ return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
+ + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
+ + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
+ + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+ }
}