Merge "Increase default negotiation timeout for netconf server to 30s"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / FollowerLogInformationImpl.java
index 9820638ab706ba0753da5ff1d574bc657da9b962..bcfd472bf6c394ab88b5e10ced1a0263be01e9f6 100644 (file)
@@ -9,71 +9,86 @@
 package org.opendaylight.controller.cluster.raft;
 
 import com.google.common.base.Stopwatch;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation {
-
     private final String id;
 
-    private final AtomicLong nextIndex;
+    private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+
+    private final RaftActorContext context;
+
+    private long nextIndex;
 
-    private final AtomicLong matchIndex;
+    private long matchIndex;
 
-    private final Stopwatch stopwatch;
+    private long lastReplicatedIndex = -1L;
 
-    private final long followerTimeoutMillis;
+    private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
 
-    public FollowerLogInformationImpl(String id, long nextIndex,
-        long matchIndex, FiniteDuration followerTimeoutDuration) {
+
+    public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
-        this.nextIndex = new AtomicLong(nextIndex);
-        this.matchIndex = new AtomicLong(matchIndex);
-        this.stopwatch = new Stopwatch();
-        this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
+        this.nextIndex = context.getCommitIndex();
+        this.matchIndex = matchIndex;
+        this.context = context;
     }
 
-    public long incrNextIndex(){
-        return nextIndex.incrementAndGet();
+    @Override
+    public long incrNextIndex() {
+        return nextIndex++;
     }
 
     @Override
     public long decrNextIndex() {
-        return nextIndex.decrementAndGet();
+        return nextIndex--;
     }
 
     @Override
-    public void setNextIndex(long nextIndex) {
-        this.nextIndex.set(nextIndex);
+    public boolean setNextIndex(long nextIndex) {
+        if(this.nextIndex != nextIndex) {
+            this.nextIndex = nextIndex;
+            return true;
+        }
+
+        return false;
     }
 
+    @Override
     public long incrMatchIndex(){
-        return matchIndex.incrementAndGet();
+        return matchIndex++;
     }
 
     @Override
-    public void setMatchIndex(long matchIndex) {
-        this.matchIndex.set(matchIndex);
+    public boolean setMatchIndex(long matchIndex) {
+        if(this.matchIndex != matchIndex) {
+            this.matchIndex = matchIndex;
+            return true;
+        }
+
+        return false;
     }
 
+    @Override
     public String getId() {
         return id;
     }
 
-    public AtomicLong getNextIndex() {
+    @Override
+    public long getNextIndex() {
         return nextIndex;
     }
 
-    public AtomicLong getMatchIndex() {
+    @Override
+    public long getMatchIndex() {
         return matchIndex;
     }
 
     @Override
     public boolean isFollowerActive() {
         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-        return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis);
+        return (stopwatch.isRunning()) &&
+                (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
     }
 
     @Override
@@ -90,4 +105,42 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
             stopwatch.stop();
         }
     }
+
+    @Override
+    public long timeSinceLastActivity() {
+        return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean okToReplicate() {
+        // Return false if we are trying to send duplicate data before the heartbeat interval
+        if(getNextIndex() == lastReplicatedIndex){
+            if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
+                    .getHeartBeatInterval().toMillis()){
+                return false;
+            }
+        }
+
+        resetLastReplicated();
+        return true;
+    }
+
+    private void resetLastReplicated(){
+        lastReplicatedIndex = getNextIndex();
+        if(lastReplicatedStopwatch.isRunning()){
+            lastReplicatedStopwatch.reset();
+        }
+        lastReplicatedStopwatch.start();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
+                .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
+                .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
+                .append(", followerTimeoutMillis=")
+                .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
+        return builder.toString();
+    }
 }