package org.opendaylight.controller.cluster.raft;
+import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
+/**
+ * Implementation of the FollowerLogInformation interface.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
public class FollowerLogInformationImpl implements FollowerLogInformation {
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
-
- private final String id;
-
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final RaftActorContext context;
- private volatile long nextIndex;
+ private long nextIndex;
- private volatile long matchIndex;
+ private long matchIndex;
private long lastReplicatedIndex = -1L;
private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
+ private short payloadVersion = -1;
+
+ // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's
+ // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron
+ // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is
+ // HELIUM_VERSION.
+ private short raftVersion = RaftVersions.HELIUM_VERSION;
- public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
- this.id = id;
+ private final PeerInfo peerInfo;
+
+ private LeaderInstallSnapshotState installSnapshotState;
+
+ /**
+ * Constructs an instance.
+ *
+ * @param peerInfo the associated PeerInfo of the follower.
+ * @param matchIndex the initial match index.
+ * @param context the RaftActorContext.
+ */
+ public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
this.nextIndex = context.getCommitIndex();
this.matchIndex = matchIndex;
this.context = context;
+ this.peerInfo = Preconditions.checkNotNull(peerInfo);
}
@Override
- public long incrNextIndex(){
- return NEXT_INDEX_UPDATER.incrementAndGet(this);
+ public long incrNextIndex() {
+ return nextIndex++;
}
@Override
public long decrNextIndex() {
- return NEXT_INDEX_UPDATER.decrementAndGet(this);
+ return nextIndex--;
}
@Override
public boolean setNextIndex(long nextIndex) {
- if(this.nextIndex != nextIndex) {
+ if (this.nextIndex != nextIndex) {
this.nextIndex = nextIndex;
return true;
}
}
@Override
- public long incrMatchIndex(){
- return MATCH_INDEX_UPDATER.incrementAndGet(this);
+ public long incrMatchIndex() {
+ return matchIndex++;
}
@Override
public boolean setMatchIndex(long matchIndex) {
- if(this.matchIndex != matchIndex) {
+ if (this.matchIndex != matchIndex) {
this.matchIndex = matchIndex;
return true;
}
@Override
public String getId() {
- return id;
+ return peerInfo.getId();
}
@Override
@Override
public boolean isFollowerActive() {
+ if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+ return false;
+ }
+
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
- return (stopwatch.isRunning()) &&
- (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
+ return stopwatch.isRunning()
+ && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis();
}
@Override
@Override
public boolean okToReplicate() {
+ if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+ return false;
+ }
+
// Return false if we are trying to send duplicate data before the heartbeat interval
- if(getNextIndex() == lastReplicatedIndex){
- if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
- .getHeartBeatInterval().toMillis()){
- return false;
- }
+ if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ < context.getConfigParams().getHeartBeatInterval().toMillis()) {
+ return false;
}
resetLastReplicated();
return true;
}
- private void resetLastReplicated(){
+ private void resetLastReplicated() {
lastReplicatedIndex = getNextIndex();
- if(lastReplicatedStopwatch.isRunning()){
+ if (lastReplicatedStopwatch.isRunning()) {
lastReplicatedStopwatch.reset();
}
lastReplicatedStopwatch.start();
}
+ @Override
+ public short getPayloadVersion() {
+ return payloadVersion;
+ }
+
+ @Override
+ public void setPayloadVersion(short payloadVersion) {
+ this.payloadVersion = payloadVersion;
+ }
+
+ @Override
+ public short getRaftVersion() {
+ return raftVersion;
+ }
+
+ @Override
+ public void setRaftVersion(short raftVersion) {
+ this.raftVersion = raftVersion;
+ }
+
+ @Override
+ @Nullable
+ public LeaderInstallSnapshotState getInstallSnapshotState() {
+ return installSnapshotState;
+ }
+
+ @Override
+ public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) {
+ if (this.installSnapshotState == null) {
+ this.installSnapshotState = Preconditions.checkNotNull(state);
+ }
+ }
+
+ @Override
+ public void clearLeaderInstallSnapshotState() {
+ installSnapshotState = null;
+ }
+
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
- .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
- .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
- .append(", followerTimeoutMillis=")
- .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
- return builder.toString();
+ return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
+ + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
+ + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
+ + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
}
}