X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FFollowerLogInformationImpl.java;h=5bf37d6534e5b7b38e5664f67b2dbf0d2d86f113;hb=6e6659e77f3e07e157c81332b367dbbd05d21f2b;hp=94f9a53a850ae22537fe0607b5a16a1328b12532;hpb=fe4049d34de103016d11f3a9050853c6380646d3;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 94f9a53a85..5bf37d6534 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -8,53 +8,172 @@ package org.opendaylight.controller.cluster.raft; -import java.util.concurrent.atomic.AtomicLong; - -public class FollowerLogInformationImpl implements FollowerLogInformation{ +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; +public class FollowerLogInformationImpl implements FollowerLogInformation { private final String id; - private final AtomicLong nextIndex; + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + + private final RaftActorContext context; + + private long nextIndex; + + private long matchIndex; + + private long lastReplicatedIndex = -1L; - private final AtomicLong matchIndex; + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); - public FollowerLogInformationImpl(String id, AtomicLong nextIndex, - AtomicLong matchIndex) { + private short payloadVersion = -1; + + private FollowerState state = FollowerState.VOTING; + + public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) { this.id = id; - this.nextIndex = nextIndex; + this.nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; + this.context = context; } - public long incrNextIndex(){ - return nextIndex.incrementAndGet(); + @Override + public long incrNextIndex() { + return nextIndex++; } - @Override public long decrNextIndex() { - return nextIndex.decrementAndGet(); + @Override + public long decrNextIndex() { + return nextIndex--; } - @Override public void setNextIndex(long nextIndex) { - this.nextIndex.set(nextIndex); + @Override + public boolean setNextIndex(long nextIndex) { + if(this.nextIndex != nextIndex) { + this.nextIndex = nextIndex; + return true; + } + + return false; } + @Override public long incrMatchIndex(){ - return matchIndex.incrementAndGet(); + return matchIndex++; } - @Override public void setMatchIndex(long matchIndex) { - this.matchIndex.set(matchIndex); + @Override + public boolean setMatchIndex(long matchIndex) { + if(this.matchIndex != matchIndex) { + this.matchIndex = matchIndex; + return true; + } + + return false; } + @Override public String getId() { return id; } - public AtomicLong getNextIndex() { + @Override + public long getNextIndex() { return nextIndex; } - public AtomicLong getMatchIndex() { + @Override + public long getMatchIndex() { return matchIndex; } + @Override + public boolean isFollowerActive() { + if(state == FollowerState.VOTING_NOT_INITIALIZED) { + return false; + } + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + return (stopwatch.isRunning()) && + (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis()); + } + + @Override + public void markFollowerActive() { + if (stopwatch.isRunning()) { + stopwatch.reset(); + } + stopwatch.start(); + } + + @Override + public void markFollowerInActive() { + if (stopwatch.isRunning()) { + stopwatch.stop(); + } + } + + @Override + public long timeSinceLastActivity() { + return stopwatch.elapsed(TimeUnit.MILLISECONDS); + } + + @Override + public boolean okToReplicate() { + if(state == FollowerState.VOTING_NOT_INITIALIZED) { + return false; + } + + // Return false if we are trying to send duplicate data before the heartbeat interval + if(getNextIndex() == lastReplicatedIndex){ + if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams() + .getHeartBeatInterval().toMillis()){ + return false; + } + } + + resetLastReplicated(); + return true; + } + + private void resetLastReplicated(){ + lastReplicatedIndex = getNextIndex(); + if(lastReplicatedStopwatch.isRunning()){ + lastReplicatedStopwatch.reset(); + } + lastReplicatedStopwatch.start(); + } + + @Override + public short getPayloadVersion() { + return payloadVersion; + } + + @Override + public void setPayloadVersion(short payloadVersion) { + this.payloadVersion = payloadVersion; + } + + @Override + public boolean canParticipateInConsensus() { + return state == FollowerState.VOTING; + } + + @Override + public void setFollowerState(FollowerState state) { + this.state = state; + } + + @Override + public FollowerState getFollowerState() { + return state; + } + + @Override + public String toString() { + return "FollowerLogInformationImpl [id=" + id + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", state=" + state + ", stopwatch=" + + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" + + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + } }