X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FFollowerLogInformationImpl.java;h=a8a33c30b20ef3c95c3f16003e3110e1227fad90;hp=0fed63098d6da1edfb6229245ffd63d1e4e4b7df;hb=95d3c7975a423951dcbdecfbfa4cb6b7a23591cc;hpb=fdc585863663ca5a4baf83ef146390e3c845a567 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 0fed63098d..a8a33c30b2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -8,61 +8,83 @@ package org.opendaylight.controller.cluster.raft; +import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import scala.concurrent.duration.FiniteDuration; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; public class FollowerLogInformationImpl implements FollowerLogInformation { - private static final AtomicLongFieldUpdater NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex"); - private static final AtomicLongFieldUpdater MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex"); + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); - private final String id; + private final RaftActorContext context; - private final Stopwatch stopwatch = new Stopwatch(); + private long nextIndex; - private final long followerTimeoutMillis; + private long matchIndex; - private volatile long nextIndex; + private long lastReplicatedIndex = -1L; - private volatile long matchIndex; + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); - public FollowerLogInformationImpl(String id, long nextIndex, - long matchIndex, FiniteDuration followerTimeoutDuration) { - this.id = id; - this.nextIndex = nextIndex; + private short payloadVersion = -1; + + // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's + // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron + // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is + // HELIUM_VERSION. + private short raftVersion = RaftVersions.HELIUM_VERSION; + + private final PeerInfo peerInfo; + + private LeaderInstallSnapshotState installSnapshotState; + + public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) { + this.nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; - this.followerTimeoutMillis = followerTimeoutDuration.toMillis(); + this.context = context; + this.peerInfo = Preconditions.checkNotNull(peerInfo); } @Override - public long incrNextIndex(){ - return NEXT_INDEX_UPDATER.incrementAndGet(this); + public long incrNextIndex() { + return nextIndex++; } @Override public long decrNextIndex() { - return NEXT_INDEX_UPDATER.decrementAndGet(this); + return nextIndex--; } @Override - public void setNextIndex(long nextIndex) { - this.nextIndex = nextIndex; + public boolean setNextIndex(long nextIndex) { + if(this.nextIndex != nextIndex) { + this.nextIndex = nextIndex; + return true; + } + + return false; } @Override public long incrMatchIndex(){ - return MATCH_INDEX_UPDATER.incrementAndGet(this); + return matchIndex++; } @Override - public void setMatchIndex(long matchIndex) { - this.matchIndex = matchIndex; + public boolean setMatchIndex(long matchIndex) { + if(this.matchIndex != matchIndex) { + this.matchIndex = matchIndex; + return true; + } + + return false; } @Override public String getId() { - return id; + return peerInfo.getId(); } @Override @@ -77,8 +99,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean isFollowerActive() { + if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis); + return (stopwatch.isRunning()) && + (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis()); } @Override @@ -100,4 +127,74 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { public long timeSinceLastActivity() { return stopwatch.elapsed(TimeUnit.MILLISECONDS); } + + @Override + public boolean okToReplicate() { + if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + + // Return false if we are trying to send duplicate data before the heartbeat interval + if(getNextIndex() == lastReplicatedIndex){ + if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams() + .getHeartBeatInterval().toMillis()){ + return false; + } + } + + resetLastReplicated(); + return true; + } + + private void resetLastReplicated(){ + lastReplicatedIndex = getNextIndex(); + if(lastReplicatedStopwatch.isRunning()){ + lastReplicatedStopwatch.reset(); + } + lastReplicatedStopwatch.start(); + } + + @Override + public short getPayloadVersion() { + return payloadVersion; + } + + @Override + public void setPayloadVersion(short payloadVersion) { + this.payloadVersion = payloadVersion; + } + + @Override + public short getRaftVersion() { + return raftVersion; + } + + @Override + public void setRaftVersion(short raftVersion) { + this.raftVersion = raftVersion; + } + + @Override + @Nullable + public LeaderInstallSnapshotState getInstallSnapshotState() { + return installSnapshotState; + } + + @Override + public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) { + this.installSnapshotState = Preconditions.checkNotNull(state); + } + + @Override + public void clearLeaderInstallSnapshotState() { + installSnapshotState = null; + } + + @Override + public String toString() { + return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" + + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + } }