X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FFollowerLogInformationImpl.java;h=5525d75b7dcf3b6ec2d2ddcaca32f3b59d518d70;hb=655216a6c75aa29d31c4c56c56a5000db56ba233;hp=90e128256132437c5f2acd4cb861e0e74a759d44;hpb=a23ab6d60b7b57184a8fe59e282e46b448c86d6a;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 90e1282561..5525d75b7d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -10,21 +10,23 @@ package org.opendaylight.controller.cluster.raft; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; public class FollowerLogInformationImpl implements FollowerLogInformation { - private static final AtomicLongFieldUpdater NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex"); - private static final AtomicLongFieldUpdater MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex"); - private final String id; private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final RaftActorContext context; - private volatile long nextIndex; + private long nextIndex; + + private long matchIndex; + + private long lastReplicatedIndex = -1L; - private volatile long matchIndex; + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); + + private short payloadVersion = -1; public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) { this.id = id; @@ -34,28 +36,38 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { } @Override - public long incrNextIndex(){ - return NEXT_INDEX_UPDATER.incrementAndGet(this); + public long incrNextIndex() { + return nextIndex++; } @Override public long decrNextIndex() { - return NEXT_INDEX_UPDATER.decrementAndGet(this); + return nextIndex--; } @Override - public void setNextIndex(long nextIndex) { - this.nextIndex = nextIndex; + public boolean setNextIndex(long nextIndex) { + if(this.nextIndex != nextIndex) { + this.nextIndex = nextIndex; + return true; + } + + return false; } @Override public long incrMatchIndex(){ - return MATCH_INDEX_UPDATER.incrementAndGet(this); + return matchIndex++; } @Override - public void setMatchIndex(long matchIndex) { - this.matchIndex = matchIndex; + public boolean setMatchIndex(long matchIndex) { + if(this.matchIndex != matchIndex) { + this.matchIndex = matchIndex; + return true; + } + + return false; } @Override @@ -100,6 +112,28 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { return stopwatch.elapsed(TimeUnit.MILLISECONDS); } + @Override + public boolean okToReplicate() { + // Return false if we are trying to send duplicate data before the heartbeat interval + if(getNextIndex() == lastReplicatedIndex){ + if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams() + .getHeartBeatInterval().toMillis()){ + return false; + } + } + + resetLastReplicated(); + return true; + } + + private void resetLastReplicated(){ + lastReplicatedIndex = getNextIndex(); + if(lastReplicatedStopwatch.isRunning()){ + lastReplicatedStopwatch.reset(); + } + lastReplicatedStopwatch.start(); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -111,5 +145,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { return builder.toString(); } + @Override + public short getPayloadVersion() { + return payloadVersion; + } + @Override + public void setPayloadVersion(short payloadVersion) { + this.payloadVersion = payloadVersion; + } }