import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class FollowerLogInformationImpl implements FollowerLogInformation {
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
-
private final String id;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final RaftActorContext context;
- private volatile long nextIndex;
+ private long nextIndex;
+
+ private long matchIndex;
+
+ private long lastReplicatedIndex = -1L;
+
+ private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
- private volatile long matchIndex;
public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
this.id = id;
}
@Override
- public long incrNextIndex(){
- return NEXT_INDEX_UPDATER.incrementAndGet(this);
+ public long incrNextIndex() {
+ return nextIndex++;
}
@Override
public long decrNextIndex() {
- return NEXT_INDEX_UPDATER.decrementAndGet(this);
+ return nextIndex--;
}
@Override
- public void setNextIndex(long nextIndex) {
- this.nextIndex = nextIndex;
+ public boolean setNextIndex(long nextIndex) {
+ if(this.nextIndex != nextIndex) {
+ this.nextIndex = nextIndex;
+ return true;
+ }
+
+ return false;
}
@Override
public long incrMatchIndex(){
- return MATCH_INDEX_UPDATER.incrementAndGet(this);
+ return matchIndex++;
}
@Override
- public void setMatchIndex(long matchIndex) {
- this.matchIndex = matchIndex;
+ public boolean setMatchIndex(long matchIndex) {
+ if(this.matchIndex != matchIndex) {
+ this.matchIndex = matchIndex;
+ return true;
+ }
+
+ return false;
}
@Override
return stopwatch.elapsed(TimeUnit.MILLISECONDS);
}
+ @Override
+ public boolean okToReplicate() {
+ // Return false if we are trying to send duplicate data before the heartbeat interval
+ if(getNextIndex() == lastReplicatedIndex){
+ if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
+ .getHeartBeatInterval().toMillis()){
+ return false;
+ }
+ }
+
+ resetLastReplicated();
+ return true;
+ }
+
+ private void resetLastReplicated(){
+ lastReplicatedIndex = getNextIndex();
+ if(lastReplicatedStopwatch.isRunning()){
+ lastReplicatedStopwatch.reset();
+ }
+ lastReplicatedStopwatch.start();
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
.append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
return builder.toString();
}
-
-
}