From: Robert Varga Date: Wed, 24 Jan 2018 01:26:58 +0000 (+0100) Subject: Make FollowerLogInformation a class X-Git-Tag: release/oxygen~6 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c983ca95187c03af54867343c8eeb8903e103ea8 Make FollowerLogInformation a class This really is a mutable state holder, there is no good reason why it should be split into an interface and an implementation. Change-Id: I63a080a4f04bfa4baf1eb291a06140605a50762c Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 6f9efbbfac..9fd2edcb0e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -5,32 +5,100 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ + package org.opendaylight.controller.cluster.raft; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; /** * The state of the followers log as known by the Leader. + * + * @author Moiz Raja + * @author Thomas Pantelis */ -public interface FollowerLogInformation { - long NO_INDEX = -1; +public final class FollowerLogInformation { + public static final long NO_INDEX = -1; + + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + + private final RaftActorContext context; + + private long nextIndex; + + private long matchIndex; + + private long lastReplicatedIndex = -1L; + + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); + + private short payloadVersion = -1; + + // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's + // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron + // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is + // HELIUM_VERSION. + private short raftVersion = RaftVersions.HELIUM_VERSION; + + private final PeerInfo peerInfo; + + private LeaderInstallSnapshotState installSnapshotState; + + private long slicedLogEntryIndex = NO_INDEX; + + /** + * Constructs an instance. + * + * @param peerInfo the associated PeerInfo of the follower. + * @param matchIndex the initial match index. + * @param context the RaftActorContext. + */ + @VisibleForTesting + FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) { + this.nextIndex = context.getCommitIndex(); + this.matchIndex = matchIndex; + this.context = context; + this.peerInfo = Preconditions.checkNotNull(peerInfo); + } + + /** + * Constructs an instance with no matching index. + * + * @param peerInfo the associated PeerInfo of the follower. + * @param context the RaftActorContext. + */ + public FollowerLogInformation(final PeerInfo peerInfo, final RaftActorContext context) { + this(peerInfo, NO_INDEX, context); + } /** * Increments the value of the follower's next index. * * @return the new value of nextIndex. */ - long incrNextIndex(); + @VisibleForTesting + long incrNextIndex() { + return nextIndex++; + } /** * Decrements the value of the follower's next index. * * @return true if the next index was decremented, ie it was previously >= 0, false otherwise. */ - boolean decrNextIndex(); + public boolean decrNextIndex() { + if (nextIndex < 0) { + return false; + } + + nextIndex--; + return true; + } /** * Sets the index of the follower's next log entry. @@ -39,14 +107,24 @@ public interface FollowerLogInformation { * @return true if the new index differed from the current index and the current index was updated, false * otherwise. */ - boolean setNextIndex(long nextIndex); + @SuppressWarnings("checkstyle:hiddenField") + public boolean setNextIndex(final long nextIndex) { + if (this.nextIndex != nextIndex) { + this.nextIndex = nextIndex; + return true; + } + + return false; + } /** * Increments the value of the follower's match index. * * @return the new value of matchIndex. */ - long incrMatchIndex(); + public long incrMatchIndex() { + return matchIndex++; + } /** * Sets the index of the follower's highest log entry. @@ -55,28 +133,48 @@ public interface FollowerLogInformation { * @return true if the new index differed from the current index and the current index was updated, false * otherwise. */ - boolean setMatchIndex(long matchIndex); + @SuppressWarnings("checkstyle:hiddenField") + public boolean setMatchIndex(final long matchIndex) { + // If the new match index is the index of the entry currently being sliced, then we know slicing is complete + // and the follower received the entry and responded so clear the slicedLogEntryIndex + if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) { + slicedLogEntryIndex = NO_INDEX; + } + + if (this.matchIndex != matchIndex) { + this.matchIndex = matchIndex; + return true; + } + + return false; + } /** * Returns the identifier of the follower. * * @return the identifier of the follower. */ - String getId(); + public String getId() { + return peerInfo.getId(); + } /** * Returns the index of the next log entry to send to the follower. * * @return index of the follower's next log entry. */ - long getNextIndex(); + public long getNextIndex() { + return nextIndex; + } /** * Returns the index of highest log entry known to be replicated on the follower. * * @return the index of highest log entry. */ - long getMatchIndex(); + public long getMatchIndex() { + return matchIndex; + } /** * Checks if the follower is active by comparing the time of the last activity with the election time out. The @@ -84,63 +182,110 @@ public interface FollowerLogInformation { * * @return true if follower is active, false otherwise. */ - boolean isFollowerActive(); + public boolean isFollowerActive() { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + return stopwatch.isRunning() + && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis(); + } /** * Marks the follower as active. This should be called when some activity has occurred for the follower. */ - void markFollowerActive(); + public void markFollowerActive() { + if (stopwatch.isRunning()) { + stopwatch.reset(); + } + stopwatch.start(); + } /** * Marks the follower as inactive. This should only be called from unit tests. */ @VisibleForTesting - void markFollowerInActive(); - + public void markFollowerInActive() { + if (stopwatch.isRunning()) { + stopwatch.stop(); + } + } /** * Returns the time since the last activity occurred for the follower. * * @return time in milliseconds since the last activity from the follower. */ - long timeSinceLastActivity(); + public long timeSinceLastActivity() { + return stopwatch.elapsed(TimeUnit.MILLISECONDS); + } /** * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid * sending duplicate message too frequently if the last replicate message was sent and no reply has been received * yet within the current heart beat interval * - * @return true if it is ok to replicate, false otherwise + * @return true if it is OK to replicate, false otherwise */ - boolean okToReplicate(); + public boolean okToReplicate() { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + return false; + } + + // Return false if we are trying to send duplicate data before the heartbeat interval + if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) + < context.getConfigParams().getHeartBeatInterval().toMillis()) { + return false; + } + + resetLastReplicated(); + return true; + } + + private void resetLastReplicated() { + lastReplicatedIndex = getNextIndex(); + if (lastReplicatedStopwatch.isRunning()) { + lastReplicatedStopwatch.reset(); + } + lastReplicatedStopwatch.start(); + } /** * Returns the log entry payload data version of the follower. * * @return the payload data version. */ - short getPayloadVersion(); + public short getPayloadVersion() { + return payloadVersion; + } /** * Sets the payload data version of the follower. * * @param payloadVersion the payload data version. */ - void setPayloadVersion(short payloadVersion); + public void setPayloadVersion(final short payloadVersion) { + this.payloadVersion = payloadVersion; + } /** * Returns the the raft version of the follower. * * @return the raft version of the follower. */ - short getRaftVersion(); + public short getRaftVersion() { + return raftVersion; + } /** * Sets the raft version of the follower. * * @param raftVersion the raft version. */ - void setRaftVersion(short raftVersion); + public void setRaftVersion(final short raftVersion) { + this.raftVersion = raftVersion; + } /** * Returns the LeaderInstallSnapshotState for the in progress install snapshot. @@ -148,19 +293,29 @@ public interface FollowerLogInformation { * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise. */ @Nullable - LeaderInstallSnapshotState getInstallSnapshotState(); + public LeaderInstallSnapshotState getInstallSnapshotState() { + return installSnapshotState; + } /** * Sets the LeaderInstallSnapshotState when an install snapshot is initiated. * * @param state the LeaderInstallSnapshotState */ - void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state); + public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) { + if (this.installSnapshotState == null) { + this.installSnapshotState = Preconditions.checkNotNull(state); + } + } /** * Clears the LeaderInstallSnapshotState when an install snapshot is complete. */ - void clearLeaderInstallSnapshotState(); + public void clearLeaderInstallSnapshotState() { + Preconditions.checkState(installSnapshotState != null); + installSnapshotState.close(); + installSnapshotState = null; + } /** * Sets the index of the log entry whose payload size exceeds the maximum size for a single message and thus @@ -168,12 +323,24 @@ public interface FollowerLogInformation { * * @param index the log entry index or NO_INDEX to clear it */ - void setSlicedLogEntryIndex(long index); + public void setSlicedLogEntryIndex(final long index) { + slicedLogEntryIndex = index; + } /** * Return whether or not log entry slicing is currently in progress. * * @return true if slicing is currently in progress, false otherwise */ - boolean isLogEntrySlicingInProgress(); + public boolean isLogEntrySlicingInProgress() { + return slicedLogEntryIndex != NO_INDEX; + } + + @Override + public String toString() { + return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" + + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java deleted file mode 100644 index 25710880dd..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.raft; - -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; - -/** - * Implementation of the FollowerLogInformation interface. - * - * @author Moiz Raja - * @author Thomas Pantelis - */ -public class FollowerLogInformationImpl implements FollowerLogInformation { - private final Stopwatch stopwatch = Stopwatch.createUnstarted(); - - private final RaftActorContext context; - - private long nextIndex; - - private long matchIndex; - - private long lastReplicatedIndex = -1L; - - private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); - - private short payloadVersion = -1; - - // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's - // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron - // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is - // HELIUM_VERSION. - private short raftVersion = RaftVersions.HELIUM_VERSION; - - private final PeerInfo peerInfo; - - private LeaderInstallSnapshotState installSnapshotState; - - private long slicedLogEntryIndex = NO_INDEX; - - /** - * Constructs an instance. - * - * @param peerInfo the associated PeerInfo of the follower. - * @param matchIndex the initial match index. - * @param context the RaftActorContext. - */ - public FollowerLogInformationImpl(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) { - this.nextIndex = context.getCommitIndex(); - this.matchIndex = matchIndex; - this.context = context; - this.peerInfo = Preconditions.checkNotNull(peerInfo); - } - - @Override - public long incrNextIndex() { - return nextIndex++; - } - - @Override - public boolean decrNextIndex() { - if (nextIndex >= 0) { - nextIndex--; - return true; - } - - return false; - } - - @Override - @SuppressWarnings("checkstyle:hiddenField") - public boolean setNextIndex(final long nextIndex) { - if (this.nextIndex != nextIndex) { - this.nextIndex = nextIndex; - return true; - } - - return false; - } - - @Override - public long incrMatchIndex() { - return matchIndex++; - } - - @Override - @SuppressWarnings("checkstyle:hiddenField") - public boolean setMatchIndex(final long matchIndex) { - // If the new match index is the index of the entry currently being sliced, then we know slicing is complete - // and the follower received the entry and responded so clear the slicedLogEntryIndex - if (isLogEntrySlicingInProgress() && slicedLogEntryIndex == matchIndex) { - slicedLogEntryIndex = NO_INDEX; - } - - if (this.matchIndex != matchIndex) { - this.matchIndex = matchIndex; - return true; - } - - return false; - } - - @Override - public String getId() { - return peerInfo.getId(); - } - - @Override - public long getNextIndex() { - return nextIndex; - } - - @Override - public long getMatchIndex() { - return matchIndex; - } - - @Override - public boolean isFollowerActive() { - if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { - return false; - } - - long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - return stopwatch.isRunning() - && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis(); - } - - @Override - public void markFollowerActive() { - if (stopwatch.isRunning()) { - stopwatch.reset(); - } - stopwatch.start(); - } - - @Override - public void markFollowerInActive() { - if (stopwatch.isRunning()) { - stopwatch.stop(); - } - } - - @Override - public long timeSinceLastActivity() { - return stopwatch.elapsed(TimeUnit.MILLISECONDS); - } - - @Override - public boolean okToReplicate() { - if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { - return false; - } - - // Return false if we are trying to send duplicate data before the heartbeat interval - if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) - < context.getConfigParams().getHeartBeatInterval().toMillis()) { - return false; - } - - resetLastReplicated(); - return true; - } - - private void resetLastReplicated() { - lastReplicatedIndex = getNextIndex(); - if (lastReplicatedStopwatch.isRunning()) { - lastReplicatedStopwatch.reset(); - } - lastReplicatedStopwatch.start(); - } - - @Override - public short getPayloadVersion() { - return payloadVersion; - } - - @Override - public void setPayloadVersion(final short payloadVersion) { - this.payloadVersion = payloadVersion; - } - - @Override - public short getRaftVersion() { - return raftVersion; - } - - @Override - public void setRaftVersion(final short raftVersion) { - this.raftVersion = raftVersion; - } - - @Override - @Nullable - public LeaderInstallSnapshotState getInstallSnapshotState() { - return installSnapshotState; - } - - @Override - public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) { - if (this.installSnapshotState == null) { - this.installSnapshotState = Preconditions.checkNotNull(state); - } - } - - @Override - public void clearLeaderInstallSnapshotState() { - Preconditions.checkState(installSnapshotState != null); - installSnapshotState.close(); - installSnapshotState = null; - } - - @Override - public void setSlicedLogEntryIndex(final long index) { - slicedLogEntryIndex = index; - } - - @Override - public boolean isLogEntrySlicingInProgress() { - return slicedLogEntryIndex != NO_INDEX; - } - - @Override - public String toString() { - return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex - + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() - + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" - + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; - } -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index dc9cb23fac..ba998d3295 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -34,7 +34,6 @@ import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; @@ -116,7 +115,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { trackers.addAll(initializeFromLeader.trackers); } else { for (PeerInfo peerInfo: context.getPeers()) { - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context); followerToLog.put(peerInfo.getId(), followerLogInformation); } } @@ -149,8 +148,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } public void addFollower(final String followerId) { - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl( - context.getPeerInfo(followerId), -1, context); + FollowerLogInformation followerLogInformation = new FollowerLogInformation(context.getPeerInfo(followerId), + context); followerToLog.put(followerId, followerLogInformation); if (heartbeatSchedule == null) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationTest.java similarity index 91% rename from opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java rename to opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationTest.java index d68339ee86..d22e9e5eb0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationTest.java @@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; -public class FollowerLogInformationImplTest { +public class FollowerLogInformationTest { @Test public void testIsFollowerActive() { @@ -31,7 +31,7 @@ public class FollowerLogInformationImplTest { context.setConfigParams(configParams); FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 9, context); + new FollowerLogInformation(new PeerInfo("follower1", null, VotingState.VOTING), 9, context); assertFalse("Follower should be termed inactive before stopwatch starts", followerLogInformation.isFollowerActive()); @@ -68,7 +68,7 @@ public class FollowerLogInformationImplTest { MockRaftActorContext context = new MockRaftActorContext(); context.setCommitIndex(0); FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 10, context); + new FollowerLogInformation(new PeerInfo("follower1", null, VotingState.VOTING), 10, context); assertTrue(followerLogInformation.okToReplicate()); assertFalse(followerLogInformation.okToReplicate()); @@ -87,7 +87,7 @@ public class FollowerLogInformationImplTest { final PeerInfo peerInfo = new PeerInfo("follower1", null, VotingState.VOTING_NOT_INITIALIZED); MockRaftActorContext context = new MockRaftActorContext(); context.setCommitIndex(0); - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context); assertFalse(followerLogInformation.okToReplicate()); @@ -106,7 +106,7 @@ public class FollowerLogInformationImplTest { final PeerInfo peerInfo = new PeerInfo("follower1", null, VotingState.NON_VOTING); MockRaftActorContext context = new MockRaftActorContext(); context.setCommitIndex(0); - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context); assertTrue(followerLogInformation.okToReplicate()); @@ -119,7 +119,7 @@ public class FollowerLogInformationImplTest { MockRaftActorContext context = new MockRaftActorContext(); context.setCommitIndex(1); FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 1, context); + new FollowerLogInformation(new PeerInfo("follower1", null, VotingState.VOTING), 1, context); assertTrue(followerLogInformation.decrNextIndex()); assertEquals("getNextIndex", 0, followerLogInformation.getNextIndex());