Bug 6540: Move LeaderInstallSnapshotState to FollowerLogInformation
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / FollowerLogInformationImpl.java
index 04b9f163f4b6ad7be0f423e485902f7b3d5ccb79..a8a33c30b20ef3c95c3f16003e3110e1227fad90 100644 (file)
@@ -8,61 +8,83 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation {
-    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
-    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
+    private final Stopwatch stopwatch = Stopwatch.createUnstarted();
 
-    private final String id;
+    private final RaftActorContext context;
 
-    private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+    private long nextIndex;
+
+    private long matchIndex;
+
+    private long lastReplicatedIndex = -1L;
+
+    private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
+
+    private short payloadVersion = -1;
 
-    private final long followerTimeoutMillis;
+    // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's
+    // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron
+    // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is
+    // HELIUM_VERSION.
+    private short raftVersion = RaftVersions.HELIUM_VERSION;
 
-    private volatile long nextIndex;
+    private final PeerInfo peerInfo;
 
-    private volatile long matchIndex;
+    private LeaderInstallSnapshotState installSnapshotState;
 
-    public FollowerLogInformationImpl(String id, long nextIndex,
-        long matchIndex, FiniteDuration followerTimeoutDuration) {
-        this.id = id;
-        this.nextIndex = nextIndex;
+    public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
+        this.nextIndex = context.getCommitIndex();
         this.matchIndex = matchIndex;
-        this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
+        this.context = context;
+        this.peerInfo = Preconditions.checkNotNull(peerInfo);
     }
 
     @Override
-    public long incrNextIndex(){
-        return NEXT_INDEX_UPDATER.incrementAndGet(this);
+    public long incrNextIndex() {
+        return nextIndex++;
     }
 
     @Override
     public long decrNextIndex() {
-        return NEXT_INDEX_UPDATER.decrementAndGet(this);
+        return nextIndex--;
     }
 
     @Override
-    public void setNextIndex(long nextIndex) {
-        this.nextIndex = nextIndex;
+    public boolean setNextIndex(long nextIndex) {
+        if(this.nextIndex != nextIndex) {
+            this.nextIndex = nextIndex;
+            return true;
+        }
+
+        return false;
     }
 
     @Override
     public long incrMatchIndex(){
-        return MATCH_INDEX_UPDATER.incrementAndGet(this);
+        return matchIndex++;
     }
 
     @Override
-    public void setMatchIndex(long matchIndex) {
-        this.matchIndex = matchIndex;
+    public boolean setMatchIndex(long matchIndex) {
+        if(this.matchIndex != matchIndex) {
+            this.matchIndex = matchIndex;
+            return true;
+        }
+
+        return false;
     }
 
     @Override
     public String getId() {
-        return id;
+        return peerInfo.getId();
     }
 
     @Override
@@ -77,8 +99,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean isFollowerActive() {
+        if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+            return false;
+        }
+
         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-        return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis);
+        return (stopwatch.isRunning()) &&
+                (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
     }
 
     @Override
@@ -102,14 +129,72 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
     }
 
     @Override
-    public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
-                .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
-                .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
-                .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]");
-        return builder.toString();
+    public boolean okToReplicate() {
+        if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+            return false;
+        }
+
+        // Return false if we are trying to send duplicate data before the heartbeat interval
+        if(getNextIndex() == lastReplicatedIndex){
+            if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
+                    .getHeartBeatInterval().toMillis()){
+                return false;
+            }
+        }
+
+        resetLastReplicated();
+        return true;
+    }
+
+    private void resetLastReplicated(){
+        lastReplicatedIndex = getNextIndex();
+        if(lastReplicatedStopwatch.isRunning()){
+            lastReplicatedStopwatch.reset();
+        }
+        lastReplicatedStopwatch.start();
+    }
+
+    @Override
+    public short getPayloadVersion() {
+        return payloadVersion;
+    }
+
+    @Override
+    public void setPayloadVersion(short payloadVersion) {
+        this.payloadVersion = payloadVersion;
+    }
+
+    @Override
+    public short getRaftVersion() {
+        return raftVersion;
+    }
+
+    @Override
+    public void setRaftVersion(short raftVersion) {
+        this.raftVersion = raftVersion;
     }
 
+    @Override
+    @Nullable
+    public LeaderInstallSnapshotState getInstallSnapshotState() {
+        return installSnapshotState;
+    }
+
+    @Override
+    public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) {
+        this.installSnapshotState = Preconditions.checkNotNull(state);
+    }
 
+    @Override
+    public void clearLeaderInstallSnapshotState() {
+        installSnapshotState = null;
+    }
+
+    @Override
+    public String toString() {
+        return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
+                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
+                + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
+                + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+    }
 }