From c9943f5bc72d4cde9356d3bd4cf73d36f4b2f754 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 19 Sep 2016 23:54:44 -0400 Subject: [PATCH 1/1] Fix warnings and javadocs in sal-akka-raft Fixed a lot of checkstyle warnings and cleaned up javadocs for classes in the org.opendaylight.controller.cluster.raft package. Change-Id: I67dd997701fe6eaf6c87e77954a4c1d4aa5fda69 Signed-off-by: Tom Pantelis --- .../raft/AbstractReplicatedLogImpl.java | 67 ++++--- .../cluster/raft/ClientRequestTracker.java | 11 +- .../controller/cluster/raft/ConfigParams.java | 60 +++--- .../cluster/raft/DefaultConfigParamsImpl.java | 3 +- .../controller/cluster/raft/ElectionTerm.java | 43 ++-- .../cluster/raft/ElectionTermImpl.java | 20 +- .../cluster/raft/FollowerLogInformation.java | 64 +++--- .../raft/FollowerLogInformationImpl.java | 39 ++-- .../cluster/raft/GetSnapshotReplyActor.java | 5 +- .../cluster/raft/ImmutableElectionTerm.java | 4 +- .../cluster/raft/NoopProcedure.java | 3 + .../cluster/raft/PeerAddressResolver.java | 1 + .../controller/cluster/raft/PeerInfo.java | 7 + .../controller/cluster/raft/RaftActor.java | 144 +++++++------- .../cluster/raft/RaftActorContext.java | 183 ++++++++++++------ .../cluster/raft/RaftActorContextImpl.java | 32 +-- ...ActorDelegatingPersistentDataProvider.java | 18 +- .../RaftActorLeadershipTransferCohort.java | 9 +- .../cluster/raft/RaftActorRecoveryCohort.java | 2 + .../raft/RaftActorRecoverySupport.java | 79 ++++---- .../RaftActorServerConfigurationSupport.java | 8 +- .../raft/RaftActorSnapshotMessageSupport.java | 8 +- .../controller/cluster/raft/RaftVersions.java | 2 + .../cluster/raft/ReplicatedLog.java | 28 ++- .../cluster/raft/ReplicatedLogEntry.java | 14 +- .../cluster/raft/ReplicatedLogImpl.java | 23 +-- .../cluster/raft/ReplicatedLogImplEntry.java | 7 + .../raft/ServerConfigurationPayload.java | 2 +- .../controller/cluster/raft/Snapshot.java | 6 + .../cluster/raft/SnapshotManager.java | 143 ++++++++------ .../cluster/raft/SnapshotState.java | 6 + .../cluster/raft/TimedRunnable.java | 11 +- 32 files changed, 604 insertions(+), 448 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index 69b78458b6..d3a44033a4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -36,19 +36,19 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { private long previousSnapshotTerm = -1; private int dataSize = 0; - public AbstractReplicatedLogImpl(long snapshotIndex, - long snapshotTerm, List unAppliedEntries, String logContext) { + protected AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm, + List unAppliedEntries, String logContext) { this.snapshotIndex = snapshotIndex; this.snapshotTerm = snapshotTerm; this.logContext = logContext; this.journal = new ArrayList<>(unAppliedEntries.size()); - for(ReplicatedLogEntry entry: unAppliedEntries) { + for (ReplicatedLogEntry entry: unAppliedEntries) { append(entry); } } - public AbstractReplicatedLogImpl() { + protected AbstractReplicatedLogImpl() { this(-1L, -1L, Collections.emptyList(), ""); } @@ -108,7 +108,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { return -1; } - for(int i = adjustedIndex; i < journal.size(); i++) { + for (int i = adjustedIndex; i < journal.size(); i++) { dataSize -= journal.get(i).size(); } @@ -119,7 +119,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public boolean append(ReplicatedLogEntry replicatedLogEntry) { - if(replicatedLogEntry.getIndex() > lastIndex()) { + if (replicatedLogEntry.getIndex() > lastIndex()) { journal.add(replicatedLogEntry); dataSize += replicatedLogEntry.size(); return true; @@ -147,41 +147,45 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { if (adjustedIndex >= 0 && adjustedIndex < size) { // physical index should be less than list size and >= 0 int maxIndex = adjustedIndex + maxEntries; - if(maxIndex > size){ + if (maxIndex > size) { maxIndex = size; } - if(maxDataSize == NO_MAX_SIZE) { + if (maxDataSize == NO_MAX_SIZE) { return new ArrayList<>(journal.subList(adjustedIndex, maxIndex)); } else { - List retList = new ArrayList<>(maxIndex - adjustedIndex); - long totalSize = 0; - for(int i = adjustedIndex; i < maxIndex; i++) { - ReplicatedLogEntry entry = journal.get(i); - totalSize += entry.size(); - if(totalSize <= maxDataSize) { - retList.add(entry); - } else { - if(retList.isEmpty()) { - // Edge case - the first entry's size exceeds the threshold. We need to return - // at least the first entry so add it here. - retList.add(entry); - } - - break; - } - } - - return retList; + return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize); } } else { return Collections.emptyList(); } } + private List copyJournalEntries(int fromIndex, int toIndex, long maxDataSize) { + List retList = new ArrayList<>(toIndex - fromIndex); + long totalSize = 0; + for (int i = fromIndex; i < toIndex; i++) { + ReplicatedLogEntry entry = journal.get(i); + totalSize += entry.size(); + if (totalSize <= maxDataSize) { + retList.add(entry); + } else { + if (retList.isEmpty()) { + // Edge case - the first entry's size exceeds the threshold. We need to return + // at least the first entry so add it here. + retList.add(entry); + } + + break; + } + } + + return retList; + } + @Override public long size() { - return journal.size(); + return journal.size(); } @Override @@ -196,7 +200,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { return false; } int adjustedIndex = adjustedIndex(logEntryIndex); - return (adjustedIndex >= 0); + return adjustedIndex >= 0; } @Override @@ -236,7 +240,8 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { snapshottedJournal = new ArrayList<>(journal.size()); - List snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex)); + List snapshotJournalEntries = + journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex)); snapshottedJournal.addAll(snapshotJournalEntries); snapshotJournalEntries.clear(); @@ -255,7 +260,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { previousSnapshotTerm = -1; dataSize = 0; // need to recalc the datasize based on the entries left after precommit. - for(ReplicatedLogEntry logEntry : journal) { + for (ReplicatedLogEntry logEntry : journal) { dataSize += logEntry.size(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java index afe680cc21..0f14844d56 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java @@ -13,21 +13,24 @@ import org.opendaylight.yangtools.concepts.Identifier; public interface ClientRequestTracker { /** + * Returns the client actor that should be sent a response when consensus is achieved. * - * @return the client actor that should be sent a response when consensus is achieved + * @return the client actor */ ActorRef getClientActor(); /** + * Returns the identifier of the object that is to be replicated. For example a transaction identifier in the case + * of a transaction. * - * @return the identifier of the object that is to be replicated. For example a transaction identifier in the case - * of a transaction + * @return the identifier */ Identifier getIdentifier(); /** + * Returns the index of the log entry that is to be replicated. * - * @return the index of the log entry that is to be replicated + * @return the index */ long getIndex(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java index c63deae717..26a828b9b0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java @@ -23,84 +23,92 @@ import scala.concurrent.duration.FiniteDuration; */ public interface ConfigParams { /** - * The minimum number of entries to be present in the in-memory Raft log - * for a snapshot to be taken + * Returns the minimum number of entries to be present in the in-memory Raft log for a snapshot to be taken. * - * @return long + * @return the minimum number of entries. */ long getSnapshotBatchCount(); /** - * The percentage of total memory in the in-memory Raft log before a snapshot - * is to be taken + * Returns the percentage of total memory used in the in-memory Raft log before a snapshot should be taken. * - * @return int + * @return the percentage. */ int getSnapshotDataThresholdPercentage(); /** - * The interval at which a heart beat message will be sent to the remote - * RaftActor + * Returns the interval at which a heart beat message should be sent to remote followers. * - * @return FiniteDuration + * @return the interval as a FiniteDuration. */ FiniteDuration getHeartBeatInterval(); /** - * The interval in which a new election would get triggered if no leader is found + * Returns the interval after which a new election should be triggered if no leader is available. * - * Normally its set to atleast twice the heart beat interval - * - * @return FiniteDuration + * @return the interval as a FiniteDuration. */ FiniteDuration getElectionTimeOutInterval(); /** - * The maximum election time variance. The election is scheduled using both - * the Election Timeout and Variance + * Returns the maximum election time variance. The election is scheduled using both the election timeout and variance. * - * @return int + * @return the election time variance. */ int getElectionTimeVariance(); /** - * The size (in bytes) of the snapshot chunk sent from Leader + * Returns the maximum size (in bytes) for the snapshot chunk sent from a Leader. + * + * @return the maximum size (in bytes). */ int getSnapshotChunkSize(); /** - * The number of journal log entries to batch on recovery before applying. + * Returns the maximum number of journal log entries to batch on recovery before applying. + * + * @return the maximum number of journal log entries. */ int getJournalRecoveryLogBatchSize(); /** - * The interval in which the leader needs to check itself if its isolated - * @return FiniteDuration + * Returns the interval in which the leader needs to check if its isolated. + * + * @return the interval in ms. */ long getIsolatedCheckIntervalInMillis(); /** - * The multiplication factor to be used to determine shard election timeout. The election timeout - * is determined by multiplying the election timeout factor with the heartbeat duration. + * Returns the multiplication factor to be used to determine the shard election timeout. The election timeout + * is determined by multiplying the election timeout factor with the heart beat duration. + * + * @return the election timeout factor. */ long getElectionTimeoutFactor(); /** + * Returns the RaftPolicy used to determine certain Raft behaviors. * - * @return An instance of org.opendaylight.controller.cluster.raft.policy.RaftPolicy or an instance of the - * DefaultRaftPolicy + * @return an instance of org.opendaylight.controller.cluster.raft.policy.RaftPolicy, if set, or an instance of the + * DefaultRaftPolicy. */ + @Nonnull RaftPolicy getRaftPolicy(); /** * Returns the PeerAddressResolver. + * + * @return the PeerAddressResolver instance. */ - @Nonnull PeerAddressResolver getPeerAddressResolver(); + @Nonnull + PeerAddressResolver getPeerAddressResolver(); /** - * @return the RaftPolicy class used by this configuration + * Returns the custom RaftPolicy class name. + * + * @return the RaftPolicy class name or null if none set. */ String getCustomRaftPolicyImplementationClass(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index f5f410c75b..6faf5df2e8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -174,8 +174,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass; LOG.info("Trying to use custom RaftPolicy {}", className); Class c = Class.forName(className); - RaftPolicy obj = (RaftPolicy)c.newInstance(); - return obj; + return (RaftPolicy)c.newInstance(); } catch (Exception e) { if(LOG.isDebugEnabled()) { LOG.error("Could not create custom raft policy, will stick with default", e); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java index 9f0d02edb9..ff79004bce 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java @@ -8,47 +8,48 @@ package org.opendaylight.controller.cluster.raft; +import javax.annotation.Nullable; + /** * ElectionTerm contains information about a RaftActors election term. *

* This information includes the last known current term of the RaftActor - * and which peer was voted for by the RaftActor in that term + * and which candidate was voted for by the RaftActor in that term. *

- * This class ensures that election term information is persisted + * This class ensures that election term information is persisted. */ public interface ElectionTerm { /** - * latest term server has seen (initialized to 0 - * on first boot, increases monotonically) + * Returns the current leader's Raft term. + * + * @return the current leader's Raft term. */ long getCurrentTerm(); /** - * candidateId that received vote in current - * term (or null if none) + * Returns the id of the candidate that this server voted for in current term. + * + * @return candidate id that received the vote or null if no candidate was voted for. */ + @Nullable String getVotedFor(); /** - * To be called mainly when we are recovering in-memory election state from - * persistent storage + * This method updates the in-memory election term state. This method should be called when recovering election + * state from persistent storage. * - * @param currentTerm - * @param votedFor + * @param term the election term. + * @param votedFor the candidate id that was voted for. */ - void update(long currentTerm, String votedFor); + void update(long term, @Nullable String votedFor); /** - * To be called when we need to update the current term either because we - * received a message from someone with a more up-to-date term or because we - * just voted for someone - *

- * This information needs to be persisted so that on recovery the replica - * can start itself in the right term and know if it has already voted in - * that term or not + * This method updates the in-memory election term state and persists it so it can be recovered on next restart. + * This method should be called when starting a new election or when a Raft RPC message is received with a higher + * term. * - * @param currentTerm - * @param votedFor + * @param term the election term. + * @param votedFor the candidate id that was voted for. */ - void updateAndPersist(long currentTerm, String votedFor); + void updateAndPersist(long term, @Nullable String votedFor); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java index e44247db4d..e123a64874 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java @@ -15,9 +15,6 @@ import org.slf4j.Logger; * Implementation of ElectionTerm for the RaftActor. */ class ElectionTermImpl implements ElectionTerm { - /** - * Identifier of the actor whose election term information this is - */ private long currentTerm = 0; private String votedFor = null; @@ -42,18 +39,17 @@ class ElectionTermImpl implements ElectionTerm { return votedFor; } - @Override public void update(long currentTerm, String votedFor) { - if(log.isDebugEnabled()) { - log.debug("{}: Set currentTerm={}, votedFor={}", logId, currentTerm, votedFor); - } - this.currentTerm = currentTerm; - this.votedFor = votedFor; + @Override + public void update(long newTerm, String newVotedFor) { + log.debug("{}: Set currentTerm={}, votedFor={}", logId, newTerm, newVotedFor); + this.currentTerm = newTerm; + this.votedFor = newVotedFor; } @Override - public void updateAndPersist(long currentTerm, String votedFor){ - update(currentTerm, votedFor); + public void updateAndPersist(long newTerm, String newVotedFor) { + update(newTerm, newVotedFor); // FIXME : Maybe first persist then update the state persistence.persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), NoopProcedure.instance()); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index c0855c7f71..d40589ad2b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -7,121 +7,139 @@ */ package org.opendaylight.controller.cluster.raft; +import com.google.common.annotations.VisibleForTesting; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; /** - * The state of the followers log as known by the Leader + * The state of the followers log as known by the Leader. */ public interface FollowerLogInformation { /** - * Increment the value of the nextIndex + * Increments the value of the follower's next index. * - * @return the new value of nextIndex + * @return the new value of nextIndex. */ long incrNextIndex(); /** - * Decrement the value of the nextIndex + * Decrements the value of the follower's next index. * - * @return the new value of nextIndex + * @return the new value of nextIndex, */ long decrNextIndex(); /** - * Sets the index of the next log entry for this follower. + * Sets the index of the follower's next log entry. * - * @param nextIndex + * @param nextIndex the new index. * @return true if the new index differed from the current index and the current index was updated, false * otherwise. */ boolean setNextIndex(long nextIndex); /** - * Increment the value of the matchIndex + * Increments the value of the follower's match index. * - * @return the new value of matchIndex + * @return the new value of matchIndex. */ long incrMatchIndex(); /** - * Sets the index of the highest log entry for this follower. + * Sets the index of the follower's highest log entry. * - * @param matchIndex + * @param matchIndex the new index. * @return true if the new index differed from the current index and the current index was updated, false * otherwise. */ boolean setMatchIndex(long matchIndex); /** + * Returns the identifier of the follower. * - * @return the identifier of the follower. This could simply be the url of the remote actor. + * @return the identifier of the follower. */ String getId(); /** - * @return index of the next log entry to send to that server (initialized to leader last log index + 1) + * Returns the index of the next log entry to send to the follower. + * + * @return index of the follower's next log entry. */ long getNextIndex(); /** - * @return index of highest log entry known to be replicated on server (initialized to 0, increases monotonically) + * Returns the index of highest log entry known to be replicated on the follower. + * + * @return the index of highest log entry. */ long getMatchIndex(); /** - * Checks if the follower is active by comparing the last updated with the duration + * Checks if the follower is active by comparing the time of the last activity with the election time out. The + * follower is active if some activity has occurred for the follower within the election time out interval. * - * @return true if follower is active, false otherwise + * @return true if follower is active, false otherwise. */ boolean isFollowerActive(); /** - * restarts the timeout clock of the follower + * Marks the follower as active. This should be called when some activity has occurred for the follower. */ void markFollowerActive(); /** - * This will stop the timeout clock + * Marks the follower as inactive. This should only be called from unit tests. */ + @VisibleForTesting void markFollowerInActive(); /** - * This will return the active time of follower, since it was last reset + * Returns the time since the last activity occurred for the follower. * - * @return time in milliseconds since the last activity from the follower + * @return time in milliseconds since the last activity from the follower. */ long timeSinceLastActivity(); /** - * This method checks if it is ok to replicate + * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid + * sending duplicate message too frequently if the last replicate message was sent and no reply has been received + * yet within the current heart beat interval * * @return true if it is ok to replicate, false otherwise */ boolean okToReplicate(); /** - * @return the payload data version of the follower. + * Returns the log entry payload data version of the follower. + * + * @return the payload data version. */ short getPayloadVersion(); /** * Sets the payload data version of the follower. + * + * @param payloadVersion the payload data version. */ void setPayloadVersion(short payloadVersion); /** + * Returns the the raft version of the follower. + * * @return the raft version of the follower. */ short getRaftVersion(); /** * Sets the raft version of the follower. + * + * @param raftVersion the raft version. */ - void setRaftVersion(short payloadVersion); + void setRaftVersion(short raftVersion); /** * Returns the LeaderInstallSnapshotState for the in progress install snapshot. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 9d58288282..f101635757 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -15,6 +15,12 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; +/** + * Implementation of the FollowerLogInformation interface. + * + * @author Moiz Raja + * @author Thomas Pantelis + */ public class FollowerLogInformationImpl implements FollowerLogInformation { private final Stopwatch stopwatch = Stopwatch.createUnstarted(); @@ -40,6 +46,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private LeaderInstallSnapshotState installSnapshotState; + /** + * Constructs an instance. + * + * @param peerInfo the associated PeerInfo of the follower. + * @param matchIndex the initial match index. + * @param context the RaftActorContext. + */ public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) { this.nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; @@ -59,7 +72,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean setNextIndex(long nextIndex) { - if(this.nextIndex != nextIndex) { + if (this.nextIndex != nextIndex) { this.nextIndex = nextIndex; return true; } @@ -68,13 +81,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { } @Override - public long incrMatchIndex(){ + public long incrMatchIndex() { return matchIndex++; } @Override public boolean setMatchIndex(long matchIndex) { - if(this.matchIndex != matchIndex) { + if (this.matchIndex != matchIndex) { this.matchIndex = matchIndex; return true; } @@ -99,13 +112,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean isFollowerActive() { - if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { return false; } long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - return (stopwatch.isRunning()) && - (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis()); + return stopwatch.isRunning() + && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis(); } @Override @@ -130,16 +143,14 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean okToReplicate() { - if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { + if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { return false; } // Return false if we are trying to send duplicate data before the heartbeat interval - if(getNextIndex() == lastReplicatedIndex){ - if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams() - .getHeartBeatInterval().toMillis()){ - return false; - } + if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) + < context.getConfigParams().getHeartBeatInterval().toMillis()) { + return false; } resetLastReplicated(); @@ -148,7 +159,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private void resetLastReplicated(){ lastReplicatedIndex = getNextIndex(); - if(lastReplicatedStopwatch.isRunning()){ + if (lastReplicatedStopwatch.isRunning()) { lastReplicatedStopwatch.reset(); } lastReplicatedStopwatch.start(); @@ -182,7 +193,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) { - if(this.installSnapshotState == null) { + if (this.installSnapshotState == null) { this.installSnapshotState = Preconditions.checkNotNull(state); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java index 73d9d5f854..3aac07f6da 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java @@ -41,7 +41,7 @@ class GetSnapshotReplyActor extends UntypedActor { @Override public void onReceive(Object message) { - if(message instanceof CaptureSnapshotReply) { + if (message instanceof CaptureSnapshotReply) { Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(), params.captureSnapshot.getUnAppliedEntries(), params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(), @@ -51,7 +51,8 @@ class GetSnapshotReplyActor extends UntypedActor { LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot); - params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)), getSelf()); + params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)), + getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } else if (message instanceof ReceiveTimeout) { LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms", diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java index 2760b48318..56cda0307c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java @@ -32,12 +32,12 @@ public class ImmutableElectionTerm implements ElectionTerm { } @Override - public void update(long currentTerm, String votedFor) { + public void update(long newTerm, String newVotedFor) { throw new UnsupportedOperationException(); } @Override - public void updateAndPersist(long currentTerm, String votedFor) { + public void updateAndPersist(long newTerm, String newVotedFor) { throw new UnsupportedOperationException(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopProcedure.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopProcedure.java index c1267fa75b..d494c88283 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopProcedure.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopProcedure.java @@ -13,6 +13,8 @@ import akka.japi.Procedure; * An akka Procedure that does nothing. * * @author Thomas Pantelis + * + * @param the Procedure type */ public class NoopProcedure implements Procedure { @@ -28,5 +30,6 @@ public class NoopProcedure implements Procedure { @Override public void apply(Object notUsed) { + // nothing to do } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java index 13bbda619b..302f2fafa4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java @@ -14,6 +14,7 @@ import javax.annotation.Nullable; * * @author Thomas Pantelis */ +@FunctionalInterface public interface PeerAddressResolver { /** * Resolves a raft actor peer id to it's remote actor address. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java index 3d15d88e38..21b56d9a21 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java @@ -17,6 +17,13 @@ public class PeerInfo { private String address; private VotingState votingState; + /** + * Constructs an instance. + * + * @param id the id of the peer. + * @param address the address of the peer. + * @param votingState the VotingState of the peer. + */ public PeerInfo(String id, String address, VotingState votingState) { this.id = id; this.address = address; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 57eb2647c9..cb07e59c5b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -53,8 +53,6 @@ import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -100,11 +98,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis - protected final Logger LOG = LoggerFactory.getLogger(getClass()); - /** * This context should NOT be passed directly to any other actor it is - * only to be consumed by the RaftActorBehaviors + * only to be consumed by the RaftActorBehaviors. */ private final RaftActorContextImpl context; @@ -124,7 +120,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private boolean shuttingDown; - public RaftActor(String id, Map peerAddresses, + protected RaftActor(String id, Map peerAddresses, Optional configParams, short payloadVersion) { persistentProvider = new PersistentDataProvider(this); @@ -133,7 +129,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, - configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(), + configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), delegatingPersistenceProvider, LOG); context.setPayloadVersion(payloadVersion); @@ -159,12 +155,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override protected void handleRecover(Object message) { - if(raftRecovery == null) { + if (raftRecovery == null) { raftRecovery = newRaftActorRecoverySupport(); } boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider); - if(recoveryComplete) { + if (recoveryComplete) { onRecoveryComplete(); initializeBehavior(); @@ -178,7 +174,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @VisibleForTesting - void initializeBehavior(){ + void initializeBehavior() { changeCurrentBehavior(new Follower(context)); } @@ -229,7 +225,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long startTime = System.nanoTime(); - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("{}: Applying state for log index {} data {}", persistenceId(), applyState.getReplicatedLogEntry().getIndex(), applyState.getReplicatedLogEntry().getData()); @@ -241,7 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } long elapsedTime = System.nanoTime() - startTime; - if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ + if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); } @@ -261,9 +257,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); - } + LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); persistence().persist(applyEntries, NoopProcedure.instance()); @@ -272,19 +266,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { new FindLeaderReply(getLeaderAddress()), getSelf() ); - } else if(message instanceof GetOnDemandRaftState) { + } else if (message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); - } else if(message instanceof InitiateCaptureSnapshot) { + } else if (message instanceof InitiateCaptureSnapshot) { captureSnapshot(); - } else if(message instanceof SwitchBehavior) { + } else if (message instanceof SwitchBehavior) { switchBehavior((SwitchBehavior) message); - } else if(message instanceof LeaderTransitioning) { + } else if (message instanceof LeaderTransitioning) { onLeaderTransitioning(); - } else if(message instanceof Shutdown) { + } else if (message instanceof Shutdown) { onShutDown(); - } else if(message instanceof Runnable) { + } else if (message instanceof Runnable) { ((Runnable)message).run(); - } else if(message instanceof NoopPayload) { + } else if (message instanceof NoopPayload) { persistData(null, null, (NoopPayload)message); } else if (!possiblyHandleBehaviorMessage(message)) { handleNonRaftCommand(message); @@ -310,7 +304,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { LOG.debug("{}: Initiating leader transfer", persistenceId()); - if(leadershipTransferInProgress == null) { + if (leadershipTransferInProgress == null) { leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override @@ -335,7 +329,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onShutDown() { LOG.debug("{}: onShutDown", persistenceId()); - if(shuttingDown) { + if (shuttingDown) { return; } @@ -380,16 +374,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onLeaderTransitioning() { LOG.debug("{}: onLeaderTransitioning", persistenceId()); Optional roleChangeNotifier = getRoleChangeNotifier(); - if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { + if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } } private void switchBehavior(SwitchBehavior message) { - if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { + if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { RaftState newState = message.getNewState(); - if( newState == RaftState.Leader || newState == RaftState.Follower) { + if ( newState == RaftState.Leader || newState == RaftState.Follower) { switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()), AbstractRaftActorBehavior.createBehavior(context, message.getNewState())); getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); @@ -414,7 +408,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Map peerAddresses = new HashMap<>(); Map peerVotingStates = new HashMap<>(); - for(PeerInfo info: context.getPeers()) { + for (PeerInfo info: context.getPeers()) { peerVotingStates.put(info.getId(), info.isVoting()); peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : ""); } @@ -446,11 +440,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { builder.lastLogTerm(lastLogEntry.getTerm()); } - if(getCurrentBehavior() instanceof AbstractLeader) { + if (getCurrentBehavior() instanceof AbstractLeader) { AbstractLeader leader = (AbstractLeader)getCurrentBehavior(); Collection followerIds = leader.getFollowerIds(); List followerInfoList = Lists.newArrayListWithCapacity(followerIds.size()); - for(String id: followerIds) { + for (String id: followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()), @@ -477,24 +471,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // it can happen that the state has not changed but the leader has changed. Optional roleChangeNotifier = getRoleChangeNotifier(); - if(!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) || - oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { - if(roleChangeNotifier.isPresent()) { + if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) + || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { + if (roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), currentBehavior.getLeaderPayloadVersion()), getSelf()); } onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId()); - if(leadershipTransferInProgress != null) { + if (leadershipTransferInProgress != null) { leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); } serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId()); } - if (roleChangeNotifier.isPresent() && - (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { + if (roleChangeNotifier.isPresent() + && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , currentBehavior.state().name()), getSelf()); } @@ -508,19 +502,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { public long snapshotSequenceNr() { // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal, // so that we can delete the persistent journal based on the saved sequence-number - // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot - // was saved and not the number we saved. - // We would want to override it , by asking akka to use the last-sequence number known to us. + // However , when akka replays the journal during recovery, it replays it from the sequence number when the + // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the + // last-sequence number known to us. return context.getSnapshotManager().getLastSequenceNumber(); } /** * When a derived RaftActor needs to persist something it must call * persistData. - * - * @param clientActor - * @param identifier - * @param data */ protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { @@ -528,14 +518,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry); - } + LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry); final RaftActorContext raftContext = getRaftActorContext(); replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> { - if (!hasFollowers()){ + if (!hasFollowers()) { // Increment the Commit Index and the Last Applied values raftContext.setCommitIndex(replicatedLogEntry1.getIndex()); raftContext.setLastApplied(replicatedLogEntry1.getIndex()); @@ -576,7 +564,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * Derived actors can call the isLeader method to check if the current - * RaftActor is the Leader or not + * RaftActor is the Leader or not. * * @return true it this RaftActor is a Leader false otherwise */ @@ -585,8 +573,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected final boolean isLeaderActive() { - return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader && - !shuttingDown && !isLeadershipTransferInProgress(); + return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader + && !shuttingDown && !isLeadershipTransferInProgress(); } private boolean isLeadershipTransferInProgress() { @@ -600,10 +588,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * * @return A reference to the leader if known, null otherwise */ - public ActorSelection getLeader(){ + public ActorSelection getLeader() { String leaderAddress = getLeaderAddress(); - if(leaderAddress == null){ + if (leaderAddress == null) { return null; } @@ -611,10 +599,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } /** + * Returns the id of the current leader. * * @return the current leader's id */ - protected final String getLeaderId(){ + protected final String getLeaderId() { return getCurrentBehavior().getLeaderId(); } @@ -623,7 +612,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return getCurrentBehavior().state(); } - protected Long getCurrentTerm(){ + protected Long getCurrentTerm() { return context.getTermInformation().getCurrentTerm(); } @@ -634,10 +623,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected void updateConfigParams(ConfigParams configParams) { // obtain the RaftPolicy for oldConfigParams and the updated one. - String oldRaftPolicy = context.getConfigParams(). - getCustomRaftPolicyImplementationClass(); - String newRaftPolicy = configParams. - getCustomRaftPolicyImplementationClass(); + String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass(); + String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass(); LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(), oldRaftPolicy, newRaftPolicy); @@ -651,7 +638,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { String previousLeaderId = behavior.getLeaderId(); short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion(); - LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId); + LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), + previousLeaderId); changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion)); } else { @@ -670,14 +658,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected void setPersistence(boolean persistent) { DataPersistenceProvider currentPersistence = persistence(); - if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) { + if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) { setPersistence(new PersistentDataProvider(this)); - if(getCurrentBehavior() != null) { + if (getCurrentBehavior() != null) { LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId()); captureSnapshot(); } - } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { + } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { setPersistence(new NonPersistentDataProvider() { /** * The way snapshotting works is, @@ -691,7 +679,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * */ @Override - public void saveSnapshot(Object o) { + public void saveSnapshot(Object object) { // Make saving Snapshot successful // Committing the snapshot here would end up calling commit in the creating state which would // be a state violation. That's why now we send a message to commit the snapshot. @@ -703,25 +691,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * setPeerAddress sets the address of a known peer at a later time. + * *

* This is to account for situations where a we know that a peer * exists but we do not know an address up-front. This may also be used in * situations where a known peer starts off in a different location and we * need to change it's address + * *

* Note that if the peerId does not match the list of peers passed to * this actor during construction an IllegalStateException will be thrown. - * - * @param peerId - * @param peerAddress */ - protected void setPeerAddress(String peerId, String peerAddress){ + protected void setPeerAddress(String peerId, String peerAddress) { context.setPeerAddress(peerId, peerAddress); } /** * The applyState method will be called by the RaftActor when some data - * needs to be applied to the actor's state + * needs to be applied to the actor's state. * * @param clientActor A reference to the client who sent this message. This * is the same reference that was passed to persistData @@ -763,7 +750,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onStateChanged(); /** - * Notifier Actor for this RaftActor to notify when a role change happens + * Notifier Actor for this RaftActor to notify when a role change happens. + * * @return ActorRef - ActorRef of the notifier or Optional.absent if none. */ protected abstract Optional getRoleChangeNotifier(); @@ -774,6 +762,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * work prior to performing the operation. On completion of any work, the run method must be called on the * given Runnable to proceed with the given operation. Important: the run method must be called on * this actor's thread dispatcher as as it modifies internal state. + * *

* The default implementation immediately runs the operation. * @@ -784,11 +773,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void onLeaderChanged(String oldLeader, String newLeader) { - } - private String getLeaderAddress(){ - if(isLeader()){ + private String getLeaderAddress() { + if (isLeader()) { return getSelf().path().toString(); } String leaderId = getLeaderId(); @@ -796,15 +784,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return null; } String peerAddress = context.getPeerAddress(leaderId); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", - persistenceId(), leaderId, peerAddress); - } + LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress); return peerAddress; } - protected boolean hasFollowers(){ + protected boolean hasFollowers() { return getRaftActorContext().hasFollowers(); } @@ -853,10 +838,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors. */ - private static abstract class BehaviorState implements Immutable { + private abstract static class BehaviorState implements Immutable { @Nullable abstract RaftActorBehavior getBehavior(); + @Nullable abstract String getLastValidLeaderId(); + @Nullable abstract String getLastLeaderId(); + @Nullable abstract short getLeaderPayloadVersion(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index ba6645f240..f9f9478dd8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -17,6 +17,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Collection; import java.util.Optional; import java.util.function.LongSupplier; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; @@ -31,29 +32,32 @@ import org.slf4j.Logger; */ public interface RaftActorContext { /** - * Create a new local actor - * @param props - * @return a reference to the newly created actor + * Creates a new local actor. + * + * @param props the Props used to create the actor. + * @return a reference to the newly created actor. */ ActorRef actorOf(Props props); /** - * Create a actor selection - * @param path - * @return an actor selection for the given actor path + * Creates an actor selection. + * + * @param path the path. + * @return an actor selection for the given actor path. */ ActorSelection actorSelection(String path); /** - * Get the identifier for the RaftActor. This identifier represents the - * name of the actor whose common state is being shared. For example the - * id could be 'inventory' + * Returns the identifier for the RaftActor. This identifier represents the + * name of the actor whose common state is being shared. * * @return the identifier */ String getId(); /** + * Returns the reference to the RaftActor. + * * @return A reference to the RaftActor itself. This could be used to send messages * to the RaftActor */ @@ -62,204 +66,255 @@ public interface RaftActorContext { /** * The akka Cluster singleton for the actor system if one is configured. * - * @return an Optional containing the CLuster instance is present. + * @return an Optional containing the Cluster instance is present. */ Optional getCluster(); /** - * @return the ElectionTerm information + * Returns the current ElectionTerm information. + * + * @return the ElectionTerm. */ + @Nonnull ElectionTerm getTermInformation(); /** - * @return index of highest log entry known to be committed (initialized to 0, increases monotonically) + * Returns the index of highest log entry known to be committed. + * + * @return index of highest log entry known to be committed. */ long getCommitIndex(); /** + * Sets the index of highest log entry known to be committed. + * * @param commitIndex new commit index */ void setCommitIndex(long commitIndex); /** - * @return index of highest log entry applied to state machine (initialized to 0, increases monotonically) + * Returns index of highest log entry applied to state machine. + * + * @return index of highest log entry applied to state machine. */ long getLastApplied(); - /** - * @param lastApplied the index of the last log entry that was applied to the state + * Sets index of highest log entry applied to state machine. + * + * @param lastApplied the new applied index. */ void setLastApplied(long lastApplied); /** + * Sets the ReplicatedLog instance. * - * @param replicatedLog the replicated log of the current RaftActor + * @param replicatedLog the ReplicatedLog instance. */ - void setReplicatedLog(ReplicatedLog replicatedLog); + void setReplicatedLog(@Nonnull ReplicatedLog replicatedLog); /** - * @return A representation of the log + * Returns the ReplicatedLog instance. + * + * @return the ReplicatedLog instance. */ + @Nonnull ReplicatedLog getReplicatedLog(); /** - * @return The ActorSystem associated with this context + * Returns the The ActorSystem associated with this context. + * + * @return the ActorSystem. */ + @Nonnull ActorSystem getActorSystem(); /** - * @return the logger to be used for logging messages to a log file + * Returns the logger to be used for logging messages. + * + * @return the logger. */ + @Nonnull Logger getLogger(); /** - * Get the address of the peer as a String. This is the same format in - * which a consumer would provide the address + * Gets the address of a peer as a String. This is the same format in which a consumer would provide the address. * - * @param peerId - * @return The address of the peer or null if the address has not yet been - * resolved + * @param peerId the id of the peer. + * @return the address of the peer or null if the address has not yet been resolved. */ + @Nullable String getPeerAddress(String peerId); /** - * @param serverCfgPayload + * Updates the peers and information to match the given ServerConfigurationPayload. + * + * @param serverCfgPayload the ServerConfigurationPayload. */ void updatePeerIds(ServerConfigurationPayload serverCfgPayload); /** + * Returns the PeerInfo instances for each peer. + * * @return list of PeerInfo */ + @Nonnull Collection getPeers(); /** - * @return the list of peer IDs. + * Returns the id's for each peer. + * + * @return the list of peer id's. */ + @Nonnull Collection getPeerIds(); /** - * Get the PeerInfo for the given peer. + * Returns the PeerInfo for the given peer. * * @param peerId - * @return the PeerInfo + * @return the PeerInfo or null if not found. */ + @Nullable PeerInfo getPeerInfo(String peerId); /** - * Add to actor peers + * Adds a new peer. * - * @param name - * @param address + * @param id the id of the new peer. + * @param address the address of the new peer. + * @param votingState the VotingState of the new peer. */ - void addToPeers(String name, String address, VotingState votingState); + void addToPeers(String id, String address, VotingState votingState); /** + * Removes a peer. * - * @param name + * @param id the id of the peer to remove. */ - void removePeer(String name); + void removePeer(String id); /** - * Given a peerId return the corresponding actor - *

+ * Returns an ActorSelection for a peer. * - * - * @param peerId - * @return The actorSelection corresponding to the peer or null if the - * address has not yet been resolved + * @param peerId the id of the peer. + * @return the actorSelection corresponding to the peer or null if the address has not yet been resolved. */ + @Nullable ActorSelection getPeerActorSelection(String peerId); /** - * Set Peer Address can be called at a later time to change the address of - * a known peer. + * Sets the address of a peer. * - *

- * Throws an IllegalStateException if the peer is unknown - * - * @param peerId - * @param peerAddress + * @param peerId the id of the peer. + * @param peerAddress the address of the peer. */ void setPeerAddress(String peerId, String peerAddress); /** - * @return ConfigParams + * Returns the ConfigParams instance. + * + * @return the ConfigParams instance. */ + @Nonnull ConfigParams getConfigParams(); /** + * Returns the SnapshotManager instance. * - * @return the SnapshotManager for this RaftActor + * @return the SnapshotManager instance. */ + @Nonnull SnapshotManager getSnapshotManager(); /** + * Returns the DataPersistenceProvider instance. * - * @return the DataPersistenceProvider for this RaftActor + * @return the DataPersistenceProvider instance. */ + @Nonnull DataPersistenceProvider getPersistenceProvider(); /** + * Determines if there are any peer followers. * - * @return true if the RaftActor has followers else false + * @return true if there are followers otherwise false. */ boolean hasFollowers(); /** + * Returns the total available memory for use in calculations. Normally this returns JVM's max memory but can be + * overridden for unit tests. * - * @return the total memory used by the ReplicatedLog + * @return the total memory. */ long getTotalMemory(); /** + * Sets the retriever of the total memory metric. * - * @param retriever a supplier of the total memory metric + * @param retriever a supplier of the total memory metric. */ @VisibleForTesting void setTotalMemoryRetriever(LongSupplier retriever); /** + * Returns the payload version to be used when replicating data. * - * @return the payload version to be used when replicating data + * @return the payload version. */ short getPayloadVersion(); /** - * @return an implementation of the RaftPolicy so that the Raft code can be adapted + * Returns the RaftPolicy used to determine certain Raft behaviors. + * + * @return the RaftPolicy instance. */ + @Nonnull RaftPolicy getRaftPolicy(); /** - * @return true if there are any dynamic server configuration changes available, - * false if static peer configurations are still in use + * Determines if there have been any dynamic server configuration changes applied. + * + * @return true if dynamic server configuration changes have been applied, false otherwise, meaning that static + * peer configuration is still in use. */ boolean isDynamicServerConfigurationInUse(); /** - * Configures the dynamic server configurations are avaialble for the RaftActor + * Sets that dynamic server configuration changes have been applied. */ void setDynamicServerConfigurationInUse(); /** - * @return the RaftActor's peer information as a ServerConfigurationPayload if the - * dynamic server configurations are available, otherwise returns null + * Returns the peer information as a ServerConfigurationPayload if dynamic server configurations have been applied. + * + * @param includeSelf include this peer's info. + * @return the peer information as a ServerConfigurationPayload or null if no dynamic server configurations have + * been applied. */ - @Nullable ServerConfigurationPayload getPeerServerInfo(boolean includeSelf); + @Nullable + ServerConfigurationPayload getPeerServerInfo(boolean includeSelf); /** - * @return true if this RaftActor is a voting member of the cluster, false otherwise. + * Determines if this peer is a voting member of the cluster. + * + * @return true if this peer is a voting member, false otherwise. */ boolean isVotingMember(); /** + * Determines if there are any voting peers. + * * @return true if there are any voting peers, false otherwise. */ boolean anyVotingPeers(); /** - * @return current behavior attached to the raft actor. + * Returns the current behavior attached to the RaftActor. + * + * @return current behavior. */ RaftActorBehavior getCurrentBehavior(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 926e9748d4..65567d3ca5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -32,6 +32,12 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; +/** + * Implementation of the RaftActorContext interface. + * + * @author Moiz Raja + * @author Thomas Pantelis + */ public class RaftActorContextImpl implements RaftActorContext { private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory(); @@ -51,7 +57,7 @@ public class RaftActorContextImpl implements RaftActorContext { private final Map peerInfoMap = new HashMap<>(); - private final Logger LOG; + private final Logger log; private ConfigParams configParams; @@ -87,7 +93,7 @@ public class RaftActorContextImpl implements RaftActorContext { this.lastApplied = lastApplied; this.configParams = configParams; this.persistenceProvider = persistenceProvider; - this.LOG = logger; + this.log = logger; for(Map.Entry e: peerAddresses.entrySet()) { peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING)); @@ -135,7 +141,7 @@ public class RaftActorContextImpl implements RaftActorContext { cluster = Optional.of(Cluster.get(getActorSystem())); } catch(Exception e) { // An exception means there's no cluster configured. This will only happen in unit tests. - LOG.debug("{}: Could not obtain Cluster: {}", getId(), e); + log.debug("{}: Could not obtain Cluster: {}", getId(), e); cluster = Optional.empty(); } } @@ -182,7 +188,7 @@ public class RaftActorContextImpl implements RaftActorContext { } @Override public Logger getLogger() { - return this.LOG; + return this.log; } @Override @@ -202,7 +208,7 @@ public class RaftActorContextImpl implements RaftActorContext { @Override public String getPeerAddress(String peerId) { - String peerAddress = null; + String peerAddress; PeerInfo peerInfo = peerInfoMap.get(peerId); if(peerInfo != null) { peerAddress = peerInfo.getAddress(); @@ -247,7 +253,7 @@ public class RaftActorContextImpl implements RaftActorContext { votingMember = false; } - LOG.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values()); + log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values()); setDynamicServerConfigurationInUse(); } @@ -257,8 +263,8 @@ public class RaftActorContextImpl implements RaftActorContext { } @Override - public void addToPeers(String id, String address, VotingState votingState) { - peerInfoMap.put(id, new PeerInfo(id, address, votingState)); + public void addToPeers(String peerId, String address, VotingState votingState) { + peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState)); numVotingPeers = -1; } @@ -284,7 +290,7 @@ public class RaftActorContextImpl implements RaftActorContext { public void setPeerAddress(String peerId, String peerAddress) { PeerInfo peerInfo = peerInfoMap.get(peerId); if(peerInfo != null) { - LOG.info("Peer address for peer {} set to {}", peerId, peerAddress); + log.info("Peer address for peer {} set to {}", peerId, peerAddress); peerInfo.setAddress(peerAddress); } } @@ -292,7 +298,7 @@ public class RaftActorContextImpl implements RaftActorContext { @Override public SnapshotManager getSnapshotManager() { if(snapshotManager == null){ - snapshotManager = new SnapshotManager(this, LOG); + snapshotManager = new SnapshotManager(this, log); } return snapshotManager; } @@ -309,7 +315,7 @@ public class RaftActorContextImpl implements RaftActorContext { @Override public boolean hasFollowers() { - return getPeerIds().size() > 0; + return !getPeerIds().isEmpty(); } @Override @@ -348,7 +354,7 @@ public class RaftActorContextImpl implements RaftActorContext { newConfig.add(new ServerInfo(getId(), votingMember)); } - return (new ServerConfigurationPayload(newConfig)); + return new ServerConfigurationPayload(newConfig); } @Override @@ -384,7 +390,7 @@ public class RaftActorContextImpl implements RaftActorContext { try { currentBehavior.close(); } catch (Exception e) { - LOG.debug("{}: Error closing behavior {}", getId(), currentBehavior.state()); + log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e); } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java index 84e2dafafa..378f33a2be 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java @@ -31,21 +31,21 @@ class RaftActorDelegatingPersistentDataProvider extends DelegatingPersistentData } @Override - public void persist(final T o, final Procedure procedure) { - if(getDelegate().isRecoveryApplicable()) { - super.persist(o, procedure); + public void persist(final T object, final Procedure procedure) { + if (getDelegate().isRecoveryApplicable()) { + super.persist(object, procedure); } else { - if(o instanceof ReplicatedLogEntry) { - Payload payload = ((ReplicatedLogEntry)o).getData(); - if(payload instanceof PersistentPayload) { + if (object instanceof ReplicatedLogEntry) { + Payload payload = ((ReplicatedLogEntry)object).getData(); + if (payload instanceof PersistentPayload) { // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes // on recovery if data persistence is later enabled. - persistentProvider.persist(payload, p -> procedure.apply(o)); + persistentProvider.persist(payload, p -> procedure.apply(object)); } else { - super.persist(o, procedure); + super.persist(object, procedure); } } else { - super.persist(o, procedure); + super.persist(object, procedure); } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index 85980e2ca3..7efc7586d8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -136,12 +136,9 @@ public class RaftActorLeadershipTransferCohort { // safely run on the actor's thread dispatcher. FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS); newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(), - new Runnable() { - @Override - public void run() { - LOG.debug("{}: leader not elected in time", raftActor.persistenceId()); - finish(true); - } + (Runnable) () -> { + LOG.debug("{}: leader not elected in time", raftActor.persistenceId()); + finish(true); }, raftActor.getContext().system().dispatcher(), raftActor.self()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java index 30e27e17fe..c3760472ac 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java @@ -20,6 +20,8 @@ public interface RaftActorRecoveryCohort { /** * This method is called during recovery at the start of a batch of state entries. Derived * classes should perform any initialization needed to start a batch. + * + * @param maxBatchSize the maximum batch size. */ void startLogRecoveryBatch(int maxBatchSize); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 15d98b5289..d0217a6bc0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; * * @author Thomas Pantelis */ + class RaftActorRecoverySupport { private final RaftActorContext context; private final RaftActorRecoveryCohort cohort; @@ -50,7 +51,7 @@ class RaftActorRecoverySupport { anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted); - if(isMigratedSerializable(message)) { + if (isMigratedSerializable(message)) { hasMigratedDataRecovered = true; } @@ -78,23 +79,23 @@ class RaftActorRecoverySupport { private void possiblyRestoreFromSnapshot() { byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot(); - if(restoreFromSnapshot == null) { + if (restoreFromSnapshot == null) { return; } - if(anyDataRecovered) { + if (anyDataRecovered) { log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty", context.getId()); return; } - try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) { + try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) { Snapshot snapshot = (Snapshot) ois.readObject(); log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot); context.getSnapshotManager().apply(new ApplySnapshot(snapshot)); - } catch(Exception e) { + } catch (Exception e) { log.error("{}: Error deserializing snapshot restore", context.getId(), e); } } @@ -104,27 +105,25 @@ class RaftActorRecoverySupport { } private void initRecoveryTimer() { - if(recoveryTimer == null) { + if (recoveryTimer == null) { recoveryTimer = Stopwatch.createStarted(); } } private void onRecoveredSnapshot(SnapshotOffer offer) { - if(log.isDebugEnabled()) { - log.debug("{}: SnapshotOffer called..", context.getId()); - } + log.debug("{}: SnapshotOffer called..", context.getId()); initRecoveryTimer(); Snapshot snapshot = (Snapshot) offer.snapshot(); - for(ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) { - if(isMigratedPayload(entry)) { + for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) { + if (isMigratedPayload(entry)) { hasMigratedDataRecovered = true; } } - if(!context.getPersistenceProvider().isRecoveryApplicable()) { + if (!context.getPersistenceProvider().isRecoveryApplicable()) { // We may have just transitioned to disabled and have a snapshot containing state data and/or log // entries - we don't want to preserve these, only the server config and election term info. @@ -149,7 +148,7 @@ class RaftActorRecoverySupport { if (snapshot.getServerConfiguration() != null) { context.updatePeerIds(snapshot.getServerConfiguration()); - if(isMigratedSerializable(snapshot.getServerConfiguration())) { + if (isMigratedSerializable(snapshot.getServerConfiguration())) { hasMigratedDataRecovered = true; } } @@ -161,35 +160,35 @@ class RaftActorRecoverySupport { } private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(), logEntry.getIndex(), logEntry.size()); } - if(isServerConfigurationPayload(logEntry)){ + if (isServerConfigurationPayload(logEntry)) { context.updatePeerIds((ServerConfigurationPayload)logEntry.getData()); } - if(isMigratedPayload(logEntry)) { + if (isMigratedPayload(logEntry)) { hasMigratedDataRecovered = true; } - if(context.getPersistenceProvider().isRecoveryApplicable()) { + if (context.getPersistenceProvider().isRecoveryApplicable()) { replicatedLog().append(logEntry); - } else if(!isPersistentPayload(logEntry)) { + } else if (!isPersistentPayload(logEntry)) { dataRecoveredWithPersistenceDisabled = true; } } private void onRecoveredApplyLogEntries(long toIndex) { - if(!context.getPersistenceProvider().isRecoveryApplicable()) { + if (!context.getPersistenceProvider().isRecoveryApplicable()) { dataRecoveredWithPersistenceDisabled = true; return; } long lastUnappliedIndex = context.getLastApplied() + 1; - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal // but the entry itself has made it to that state and recovered via the snapshot log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}", @@ -199,7 +198,7 @@ class RaftActorRecoverySupport { long lastApplied = lastUnappliedIndex - 1; for (long i = lastUnappliedIndex; i <= toIndex; i++) { ReplicatedLogEntry logEntry = replicatedLog().get(i); - if(logEntry != null) { + if (logEntry != null) { lastApplied++; batchRecoveredLogEntry(logEntry); } else { @@ -214,7 +213,7 @@ class RaftActorRecoverySupport { } private void onDeleteEntries(DeleteEntries deleteEntries) { - if(context.getPersistenceProvider().isRecoveryApplicable()) { + if (context.getPersistenceProvider().isRecoveryApplicable()) { replicatedLog().removeFrom(deleteEntries.getFromIndex()); } else { dataRecoveredWithPersistenceDisabled = true; @@ -225,14 +224,14 @@ class RaftActorRecoverySupport { initRecoveryTimer(); int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize(); - if(!isServerConfigurationPayload(logEntry)){ - if(currentRecoveryBatchCount == 0) { + if (!isServerConfigurationPayload(logEntry)) { + if (currentRecoveryBatchCount == 0) { cohort.startLogRecoveryBatch(batchSize); } cohort.appendRecoveredLogEntry(logEntry.getData()); - if(++currentRecoveryBatchCount >= batchSize) { + if (++currentRecoveryBatchCount >= batchSize) { endCurrentLogRecoveryBatch(); } } @@ -244,26 +243,25 @@ class RaftActorRecoverySupport { } private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) { - if(currentRecoveryBatchCount > 0) { + if (currentRecoveryBatchCount > 0) { endCurrentLogRecoveryBatch(); } String recoveryTime = ""; - if(recoveryTimer != null) { + if (recoveryTimer != null) { recoveryTimer.stop(); recoveryTime = " in " + recoveryTimer.toString(); recoveryTimer = null; } - log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + - "Persistence Id = " + context.getId() + - " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " + - "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(), + log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + "Persistence Id = " + + context.getId() + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " + + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(), replicatedLog().size()); - if(dataRecoveredWithPersistenceDisabled || - hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) { - if(hasMigratedDataRecovered) { + if (dataRecoveredWithPersistenceDisabled + || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) { + if (hasMigratedDataRecovered) { log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId()); } else { log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId()); @@ -274,14 +272,15 @@ class RaftActorRecoverySupport { // messages. Either way, we persist a snapshot and delete all the messages from the akka journal // to clean out unwanted messages. - Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1, + Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), + -1, -1, -1, -1, context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true)); persistentProvider.saveSnapshot(snapshot); persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber()); - } else if(hasMigratedDataRecovered) { + } else if (hasMigratedDataRecovered) { log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId()); context.getSnapshotManager().capture(replicatedLog().last(), -1); @@ -290,19 +289,19 @@ class RaftActorRecoverySupport { } } - private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){ + private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry) { return repLogEntry.getData() instanceof ServerConfigurationPayload; } - private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry){ + private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry) { return repLogEntry.getData() instanceof PersistentPayload; } - private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry){ + private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry) { return isMigratedSerializable(repLogEntry.getData()); } - private static boolean isMigratedSerializable(Object message){ + private static boolean isMigratedSerializable(Object message) { return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated(); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 2fa700256a..16c091a4c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -343,11 +343,11 @@ class RaftActorServerConfigurationSupport { } @Override - public void onNewOperation(ServerOperationContext operationContext) { + public void onNewOperation(ServerOperationContext newOperationContext) { if(timedOut) { - sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); + sendReply(newOperationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); } else { - super.onNewOperation(operationContext); + super.onNewOperation(newOperationContext); } } } @@ -538,7 +538,7 @@ class RaftActorServerConfigurationSupport { * * @param the operation type */ - private static abstract class ServerOperationContext { + private abstract static class ServerOperationContext { private final T operation; private final ActorRef clientRequestor; private final Identifier contextId; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 5173dc89f7..4a17becd5b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -71,7 +71,8 @@ class RaftActorSnapshotMessageSupport { } private void onCaptureSnapshotReply(byte[] snapshotBytes) { - log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length); + log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), + snapshotBytes.length); context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory()); } @@ -100,7 +101,7 @@ class RaftActorSnapshotMessageSupport { private void onGetSnapshot(ActorRef sender) { log.debug("{}: onGetSnapshot", context.getId()); - if(context.getPersistenceProvider().isRecoveryApplicable()) { + if (context.getPersistenceProvider().isRecoveryApplicable()) { CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot( context.getReplicatedLog().last(), -1, false); @@ -110,7 +111,8 @@ class RaftActorSnapshotMessageSupport { cohort.createSnapshot(snapshotReplyActor); } else { - Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1, + Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), + -1, -1, -1, -1, context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java index 9c70cf9c45..364ca5af7b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.raft; /** + * Enumerates the raft versions. + * * @author Thomas Pantelis */ public interface RaftVersions { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index e7f1be5ba3..2b527db174 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -26,20 +26,28 @@ public interface ReplicatedLog { * @return the ReplicatedLogEntry if found, otherwise null if the adjusted index less than 0 or * greater than the size of the in-memory journal. */ - @Nullable ReplicatedLogEntry get(long index); + @Nullable + ReplicatedLogEntry get(long index); /** * Return the last replicated log entry in the log or null of not found. + * + * @return the last replicated log entry in the log or null of not found. */ - @Nullable ReplicatedLogEntry last(); + @Nullable + ReplicatedLogEntry last(); /** * Return the index of the last entry in the log or -1 if the log is empty. + * + * @return the index of the last entry in the log or -1 if the log is empty. */ long lastIndex(); /** * Return the term of the last entry in the log or -1 if the log is empty. + * + * @return the term of the last entry in the log or -1 if the log is empty. */ long lastTerm(); @@ -85,6 +93,12 @@ public interface ReplicatedLog { */ void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry); + /** + * Appends an entry to the in-memory log and persists it as well. + * + * @param replicatedLogEntry the entry to append + * @param callback the Procedure to be notified when persistence is complete. + */ void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback); /** @@ -182,14 +196,16 @@ public interface ReplicatedLog { void snapshotRollback(); /** - * Size of the data in the log (in bytes) + * Returns the size of the data in the log (in bytes) + * + * @return the size of the data in the log (in bytes). */ int dataSize(); /** - * We decide if snapshot need to be captured based on the count/memory consumed. - * @param replicatedLogEntry + * Determines if a snapshot need to be captured based on the count/memory consumed. + * + * @param replicatedLogEntry the last log entry. */ void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry); - } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java index 1bced2404d..a09a0a23ac 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java @@ -11,30 +11,34 @@ package org.opendaylight.controller.cluster.raft; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; /** - * Represents one entry in the replicated log + * Represents one entry in the replicated log. */ public interface ReplicatedLogEntry { /** + * Returns the payload/data to be replicated. * - * @return The payload/data to be replicated + * @return the payload/data */ Payload getData(); /** + * Returns the term of the entry. * - * @return The term of the entry + * @return the term */ long getTerm(); /** + * Returns the index of the entry. * - * @return The index of the entry + * @return the index */ long getIndex(); /** + * Returns the size of the entry in bytes. An approximate number may be good enough. * - * @return The size of the entry in bytes. An approximate number may be good enough. + * @return the size of the entry in bytes. */ int size(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index c81ecfbd21..4295633c0a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -19,11 +19,7 @@ import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private static final int DATA_SIZE_DIVIDER = 5; - private final Procedure deleteProcedure = new Procedure() { - @Override - public void apply(final DeleteEntries notUsed) { - } - }; + private final Procedure deleteProcedure = NoopProcedure.instance(); private final RaftActorContext context; private long dataSizeSinceLastSnapshot = 0L; @@ -111,19 +107,16 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { // handler. This also holds for multiple persist calls in context // of a single command. context.getPersistenceProvider().persist(replicatedLogEntry, - new Procedure() { - @Override - public void apply(final ReplicatedLogEntry param) throws Exception { - context.getLogger().debug("{}: persist complete {}", context.getId(), param); + param -> { + context.getLogger().debug("{}: persist complete {}", context.getId(), param); - int logEntrySize = param.size(); - dataSizeSinceLastSnapshot += logEntrySize; + int logEntrySize = param.size(); + dataSizeSinceLastSnapshot += logEntrySize; - if (callback != null) { - callback.apply(param); - } + if (callback != null) { + callback.apply(param); } } ); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java index e5bf72233c..1ff69e7f4a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java @@ -22,6 +22,13 @@ public final class ReplicatedLogImplEntry implements ReplicatedLogEntry, Seriali private final long term; private final Payload payload; + /** + * Constructs an instance. + * + * @param index the index + * @param term the term + * @param payload the payload + */ public ReplicatedLogImplEntry(final long index, final long term, final Payload payload) { this.index = index; this.term = term; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java index 2c20041e2e..865594715d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java @@ -47,7 +47,7 @@ public class ServerConfigurationPayload extends Payload implements PersistentPay @Override public int size() { - if(serializedSize < 0) { + if (serializedSize < 0) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(bos); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java index fe4b79ae3d..8a836bffb9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java @@ -11,6 +11,12 @@ import java.io.Serializable; import java.util.List; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +/** + * Represents a snapshot of the raft data. + * + * @author Moiz Raja + * @author Thomas Pantelis + */ public class Snapshot implements Serializable { private static final long serialVersionUID = -8298574936724056236L; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 3fc43c7fd1..5db4706c62 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -19,13 +19,19 @@ import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.slf4j.Logger; +/** + * Manages the capturing of snapshots for a RaftActor. + * + * @author Moiz Raja + * @author Thomas Pantelis + */ public class SnapshotManager implements SnapshotState { private final SnapshotState IDLE = new Idle(); private final SnapshotState PERSISTING = new Persisting(); private final SnapshotState CREATING = new Creating(); - private final Logger LOG; + private final Logger log; private final RaftActorContext context; private final LastAppliedTermInformationReader lastAppliedTermInformationReader = new LastAppliedTermInformationReader(); @@ -42,9 +48,15 @@ public class SnapshotManager implements SnapshotState { private ApplySnapshot applySnapshot; private Consumer applySnapshotProcedure; + /** + * Constructs an instance. + * + * @param context the RaftActorContext + * @param logger the Logger + */ public SnapshotManager(RaftActorContext context, Logger logger) { this.context = context; - this.LOG = logger; + this.log = logger; } public boolean isApplying() { @@ -108,14 +120,22 @@ public class SnapshotManager implements SnapshotState { return captureSnapshot; } - private boolean hasFollowers(){ + private boolean hasFollowers() { return context.hasFollowers(); } - private String persistenceId(){ + private String persistenceId() { return context.getId(); } + /** + * Constructs a CaptureSnapshot instance. + * + * @param lastLogEntry the last log entry for the snapshot. + * @param replicatedToAllIndex the index of the last entry replicated to all followers. + * @param installSnapshotInitiated true if snapshot is initiated to install on a follower. + * @return a new CaptureSnapshot instance. + */ public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean installSnapshotInitiated) { TermInformationReader lastAppliedTermInfoReader = @@ -135,11 +155,11 @@ public class SnapshotManager implements SnapshotState { long lastLogEntryIndex = lastAppliedIndex; long lastLogEntryTerm = lastAppliedTerm; - if(lastLogEntry != null) { + if (lastLogEntry != null) { lastLogEntryIndex = lastLogEntry.getIndex(); lastLogEntryTerm = lastLogEntry.getTerm(); } else { - LOG.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.", + log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm); } @@ -156,54 +176,55 @@ public class SnapshotManager implements SnapshotState { @Override public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { - LOG.debug("capture should not be called in state {}", this); + log.debug("capture should not be called in state {}", this); return false; } @Override - public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { - LOG.debug("captureToInstall should not be called in state {}", this); + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, + String targetFollower) { + log.debug("captureToInstall should not be called in state {}", this); return false; } @Override public void apply(ApplySnapshot snapshot) { - LOG.debug("apply should not be called in state {}", this); + log.debug("apply should not be called in state {}", this); } @Override public void persist(final byte[] snapshotBytes, final long totalMemory) { - LOG.debug("persist should not be called in state {}", this); + log.debug("persist should not be called in state {}", this); } @Override public void commit(final long sequenceNumber, long timeStamp) { - LOG.debug("commit should not be called in state {}", this); + log.debug("commit should not be called in state {}", this); } @Override public void rollback() { - LOG.debug("rollback should not be called in state {}", this); + log.debug("rollback should not be called in state {}", this); } @Override public long trimLog(final long desiredTrimIndex) { - LOG.debug("trimLog should not be called in state {}", this); + log.debug("trimLog should not be called in state {}", this); return -1; } protected long doTrimLog(final long desiredTrimIndex) { // we would want to keep the lastApplied as its used while capturing snapshots long lastApplied = context.getLastApplied(); - long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); + long tempMin = Math.min(desiredTrimIndex, lastApplied > -1 ? lastApplied - 1 : -1); - if(LOG.isTraceEnabled()) { - LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}", + if (log.isTraceEnabled()) { + log.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}", persistenceId(), desiredTrimIndex, lastApplied, tempMin); } if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { - LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin, + log.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin, context.getTermInformation().getCurrentTerm()); //use the term of the temp-min, since we check for isPresent, entry will not be null @@ -214,7 +235,7 @@ public class SnapshotManager implements SnapshotState { } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if(tempMin > currentBehavior.getReplicatedToAllIndex()) { + if (tempMin > currentBehavior.getReplicatedToAllIndex()) { // It's possible a follower was lagging and an install snapshot advanced its match index past // the current replicatedToAllIndex. Since the follower is now caught up we should advance the // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely @@ -236,16 +257,16 @@ public class SnapshotManager implements SnapshotState { private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null); - if(captureSnapshot.isInstallSnapshotInitiated()) { - LOG.info("{}: Initiating snapshot capture {} to install on {}", + if (captureSnapshot.isInstallSnapshotInitiated()) { + log.info("{}: Initiating snapshot capture {} to install on {}", persistenceId(), captureSnapshot, targetFollower); } else { - LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot); + log.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot); } lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber(); - LOG.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber); + log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber); SnapshotManager.this.currentState = CREATING; @@ -253,7 +274,7 @@ public class SnapshotManager implements SnapshotState { createSnapshotProcedure.run(); } catch (Exception e) { SnapshotManager.this.currentState = IDLE; - LOG.error("Error creating snapshot", e); + log.error("Error creating snapshot", e); return false; } @@ -266,19 +287,20 @@ public class SnapshotManager implements SnapshotState { } @Override - public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { + public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, + String targetFollower) { return capture(lastLogEntry, replicatedToAllIndex, targetFollower); } @Override - public void apply(ApplySnapshot applySnapshot) { - SnapshotManager.this.applySnapshot = applySnapshot; + public void apply(ApplySnapshot toApply) { + SnapshotManager.this.applySnapshot = toApply; lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber(); - LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber); + log.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber); - context.getPersistenceProvider().saveSnapshot(applySnapshot.getSnapshot()); + context.getPersistenceProvider().saveSnapshot(toApply.getSnapshot()); SnapshotManager.this.currentState = PERSISTING; } @@ -310,10 +332,9 @@ public class SnapshotManager implements SnapshotState { context.getPersistenceProvider().saveSnapshot(snapshot); - LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot); + log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot); - long dataThreshold = totalMemory * - context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; + long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold; boolean logSizeExceededSnapshotBatchCount = @@ -321,15 +342,16 @@ public class SnapshotManager implements SnapshotState { final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) { - if(LOG.isDebugEnabled()) { - if(dataSizeThresholdExceeded) { - LOG.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}", + if (log.isDebugEnabled()) { + if (dataSizeThresholdExceeded) { + log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}", context.getId(), context.getReplicatedLog().dataSize(), dataThreshold, captureSnapshot.getLastAppliedIndex()); } else { - LOG.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}", + log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}", context.getId(), context.getReplicatedLog().size(), - context.getConfigParams().getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex()); + context.getConfigParams().getSnapshotBatchCount(), + captureSnapshot.getLastAppliedIndex()); } } @@ -342,11 +364,11 @@ public class SnapshotManager implements SnapshotState { // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an // install snapshot to a follower. - if(captureSnapshot.getReplicatedToAllIndex() >= 0) { + if (captureSnapshot.getReplicatedToAllIndex() >= 0) { currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); } - } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ + } else if (captureSnapshot.getReplicatedToAllIndex() != -1) { // clear the log based on replicatedToAllIndex context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), captureSnapshot.getReplicatedToAllTerm()); @@ -361,8 +383,8 @@ public class SnapshotManager implements SnapshotState { context.getReplicatedLog().getSnapshotTerm()); } - LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " + - "and term: {}", context.getId(), context.getReplicatedLog().getSnapshotIndex(), + log.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} and term: {}", + context.getId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm()); if (context.getId().equals(currentBehavior.getLeaderId()) @@ -386,9 +408,9 @@ public class SnapshotManager implements SnapshotState { @Override public void commit(final long sequenceNumber, long timeStamp) { - LOG.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber); + log.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber); - if(applySnapshot != null) { + if (applySnapshot != null) { try { Snapshot snapshot = applySnapshot.getSnapshot(); @@ -398,17 +420,17 @@ public class SnapshotManager implements SnapshotState { context.setCommitIndex(snapshot.getLastAppliedIndex()); context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor()); - if(snapshot.getServerConfiguration() != null) { + if (snapshot.getServerConfiguration() != null) { context.updatePeerIds(snapshot.getServerConfiguration()); } - if(snapshot.getState().length > 0 ) { + if (snapshot.getState().length > 0 ) { applySnapshotProcedure.accept(snapshot.getState()); } applySnapshot.getCallback().onSuccess(); } catch (Exception e) { - LOG.error("{}: Error applying snapshot", context.getId(), e); + log.error("{}: Error applying snapshot", context.getId(), e); } } else { context.getReplicatedLog().snapshotCommit(); @@ -425,11 +447,11 @@ public class SnapshotManager implements SnapshotState { @Override public void rollback() { // Nothing to rollback if we're applying a snapshot from the leader. - if(applySnapshot == null) { + if (applySnapshot == null) { context.getReplicatedLog().snapshotRollback(); - LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." + - "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), + log.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), context.getReplicatedLog().size()); @@ -457,20 +479,21 @@ public class SnapshotManager implements SnapshotState { private interface TermInformationReader { long getIndex(); + long getTerm(); } - static class LastAppliedTermInformationReader implements TermInformationReader{ + static class LastAppliedTermInformationReader implements TermInformationReader { private long index; private long term; - public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex, - ReplicatedLogEntry lastLogEntry, boolean hasFollowers){ + LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex, ReplicatedLogEntry lastLogEntry, + boolean hasFollowers) { ReplicatedLogEntry entry = log.get(originalIndex); this.index = -1L; this.term = -1L; if (!hasFollowers) { - if(lastLogEntry != null) { + if (lastLogEntry != null) { // since we have persisted the last-log-entry to persistent journal before the capture, // we would want to snapshot from this entry. index = lastLogEntry.getIndex(); @@ -479,7 +502,7 @@ public class SnapshotManager implements SnapshotState { } else if (entry != null) { index = entry.getIndex(); term = entry.getTerm(); - } else if(log.getSnapshotIndex() > -1){ + } else if (log.getSnapshotIndex() > -1) { index = log.getSnapshotIndex(); term = log.getSnapshotTerm(); } @@ -487,21 +510,21 @@ public class SnapshotManager implements SnapshotState { } @Override - public long getIndex(){ + public long getIndex() { return this.index; } @Override - public long getTerm(){ + public long getTerm() { return this.term; } } - private static class ReplicatedToAllTermInformationReader implements TermInformationReader{ + private static class ReplicatedToAllTermInformationReader implements TermInformationReader { private long index; private long term; - ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){ + ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex) { ReplicatedLogEntry entry = log.get(originalIndex); this.index = -1L; this.term = -1L; @@ -515,12 +538,12 @@ public class SnapshotManager implements SnapshotState { } @Override - public long getIndex(){ + public long getIndex() { return this.index; } @Override - public long getTerm(){ + public long getTerm() { return this.term; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java index a6bcddfc13..639fdee155 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -10,6 +10,12 @@ package org.opendaylight.controller.cluster.raft; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +/** + * Interface for a snapshot phase state. + * + * @author Moiz Raja + * @author Thomas Pantelis + */ public interface SnapshotState { /** * @return true when a snapshot is being captured diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java index 933b134341..c58d6f90f3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java @@ -14,6 +14,7 @@ import scala.concurrent.duration.FiniteDuration; /** * An abstract class that implements a Runnable operation with a timer such that if the run method isn't * invoked within a timeout period, the operation is cancelled via {@link #doCancel}. + * *

* Note: this class is not thread safe and is intended for use only within the context of the same * actor that's passed on construction. The run method must be called on this actor's thread dispatcher as it @@ -28,17 +29,13 @@ abstract class TimedRunnable implements Runnable { TimedRunnable(FiniteDuration timeout, RaftActor actor) { Preconditions.checkNotNull(timeout); Preconditions.checkNotNull(actor); - cancelTimer = actor.getContext().system().scheduler().scheduleOnce(timeout, actor.self(), new Runnable() { - @Override - public void run() { - cancel(); - } - }, actor.getContext().system().dispatcher(), actor.self()); + cancelTimer = actor.getContext().system().scheduler().scheduleOnce(timeout, actor.self(), + (Runnable) () -> cancel(), actor.getContext().system().dispatcher(), actor.self()); } @Override public void run() { - if(canRun) { + if (canRun) { canRun = false; cancelTimer.cancel(); doRun(); -- 2.36.6