Fix warnings and javadocs in sal-akka-raft 65/45865/4
authorTom Pantelis <tpanteli@brocade.com>
Tue, 20 Sep 2016 03:54:44 +0000 (23:54 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 6 Oct 2016 18:35:25 +0000 (18:35 +0000)
Fixed a lot of checkstyle warnings and cleaned up javadocs for classes in the
org.opendaylight.controller.cluster.raft package.

Change-Id: I67dd997701fe6eaf6c87e77954a4c1d4aa5fda69
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
32 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopProcedure.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java

index 69b7845..d3a4403 100644 (file)
@@ -36,19 +36,19 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     private long previousSnapshotTerm = -1;
     private int dataSize = 0;
 
-    public AbstractReplicatedLogImpl(long snapshotIndex,
-        long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
+    protected AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm,
+            List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
         this.snapshotIndex = snapshotIndex;
         this.snapshotTerm = snapshotTerm;
         this.logContext = logContext;
 
         this.journal = new ArrayList<>(unAppliedEntries.size());
-        for(ReplicatedLogEntry entry: unAppliedEntries) {
+        for (ReplicatedLogEntry entry: unAppliedEntries) {
             append(entry);
         }
     }
 
-    public AbstractReplicatedLogImpl() {
+    protected AbstractReplicatedLogImpl() {
         this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
     }
 
@@ -108,7 +108,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
             return -1;
         }
 
-        for(int i = adjustedIndex; i < journal.size(); i++) {
+        for (int i = adjustedIndex; i < journal.size(); i++) {
             dataSize -= journal.get(i).size();
         }
 
@@ -119,7 +119,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public boolean append(ReplicatedLogEntry replicatedLogEntry) {
-        if(replicatedLogEntry.getIndex() > lastIndex()) {
+        if (replicatedLogEntry.getIndex() > lastIndex()) {
             journal.add(replicatedLogEntry);
             dataSize += replicatedLogEntry.size();
             return true;
@@ -147,41 +147,45 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         if (adjustedIndex >= 0 && adjustedIndex < size) {
             // physical index should be less than list size and >= 0
             int maxIndex = adjustedIndex + maxEntries;
-            if(maxIndex > size){
+            if (maxIndex > size) {
                 maxIndex = size;
             }
 
-            if(maxDataSize == NO_MAX_SIZE) {
+            if (maxDataSize == NO_MAX_SIZE) {
                 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
             } else {
-                List<ReplicatedLogEntry> retList = new ArrayList<>(maxIndex - adjustedIndex);
-                long totalSize = 0;
-                for(int i = adjustedIndex; i < maxIndex; i++) {
-                    ReplicatedLogEntry entry = journal.get(i);
-                    totalSize += entry.size();
-                    if(totalSize <= maxDataSize) {
-                        retList.add(entry);
-                    } else {
-                        if(retList.isEmpty()) {
-                            // Edge case - the first entry's size exceeds the threshold. We need to return
-                            // at least the first entry so add it here.
-                            retList.add(entry);
-                        }
-
-                        break;
-                    }
-                }
-
-                return retList;
+                return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
             }
         } else {
             return Collections.emptyList();
         }
     }
 
+    private List<ReplicatedLogEntry> copyJournalEntries(int fromIndex, int toIndex, long maxDataSize) {
+        List<ReplicatedLogEntry> retList = new ArrayList<>(toIndex - fromIndex);
+        long totalSize = 0;
+        for (int i = fromIndex; i < toIndex; i++) {
+            ReplicatedLogEntry entry = journal.get(i);
+            totalSize += entry.size();
+            if (totalSize <= maxDataSize) {
+                retList.add(entry);
+            } else {
+                if (retList.isEmpty()) {
+                    // Edge case - the first entry's size exceeds the threshold. We need to return
+                    // at least the first entry so add it here.
+                    retList.add(entry);
+                }
+
+                break;
+            }
+        }
+
+        return retList;
+    }
+
     @Override
     public long size() {
-       return journal.size();
+        return journal.size();
     }
 
     @Override
@@ -196,7 +200,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
             return false;
         }
         int adjustedIndex = adjustedIndex(logEntryIndex);
-        return (adjustedIndex >= 0);
+        return adjustedIndex >= 0;
     }
 
     @Override
@@ -236,7 +240,8 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
         snapshottedJournal = new ArrayList<>(journal.size());
 
-        List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
+        List<ReplicatedLogEntry> snapshotJournalEntries =
+                journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
 
         snapshottedJournal.addAll(snapshotJournalEntries);
         snapshotJournalEntries.clear();
@@ -255,7 +260,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         previousSnapshotTerm = -1;
         dataSize = 0;
         // need to recalc the datasize based on the entries left after precommit.
-        for(ReplicatedLogEntry logEntry : journal) {
+        for (ReplicatedLogEntry logEntry : journal) {
             dataSize += logEntry.size();
         }
 
index afe680c..0f14844 100644 (file)
@@ -13,21 +13,24 @@ import org.opendaylight.yangtools.concepts.Identifier;
 
 public interface ClientRequestTracker {
     /**
+     * Returns the client actor that should be sent a response when consensus is achieved.
      *
-     * @return the client actor that should be sent a response when consensus is achieved
+     * @return the client actor
      */
     ActorRef getClientActor();
 
     /**
+     * Returns the identifier of the object that is to be replicated. For example a transaction identifier in the case
+     * of a transaction.
      *
-     * @return the identifier of the object that is to be replicated. For example a transaction identifier in the case
-     * of a transaction
+     * @return the identifier
      */
     Identifier getIdentifier();
 
     /**
+     * Returns the index of the log entry that is to be replicated.
      *
-     * @return the index of the log entry that is to be replicated
+     * @return the index
      */
     long getIndex();
 
index c63deae..26a828b 100644 (file)
@@ -23,84 +23,92 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public interface ConfigParams {
     /**
-     * The minimum number of entries to be present in the in-memory Raft log
-     * for a snapshot to be taken
+     * Returns the minimum number of entries to be present in the in-memory Raft log for a snapshot to be taken.
      *
-     * @return long
+     * @return the minimum number of entries.
      */
     long getSnapshotBatchCount();
 
     /**
-     * The percentage of total memory in the in-memory Raft log before a snapshot
-     * is to be taken
+     * Returns the percentage of total memory used in the in-memory Raft log before a snapshot should be taken.
      *
-     * @return int
+     * @return the percentage.
      */
     int getSnapshotDataThresholdPercentage();
 
     /**
-     * The interval at which a heart beat message will be sent to the remote
-     * RaftActor
+     * Returns the interval at which a heart beat message should be sent to remote followers.
      *
-     * @return FiniteDuration
+     * @return the interval as a FiniteDuration.
      */
     FiniteDuration getHeartBeatInterval();
 
     /**
-     * The interval in which a new election would get triggered if no leader is found
+     * Returns the interval after which a new election should be triggered if no leader is available.
      *
-     * Normally its set to atleast twice the heart beat interval
-     *
-     * @return FiniteDuration
+     * @return the interval as a FiniteDuration.
      */
     FiniteDuration getElectionTimeOutInterval();
 
     /**
-     * The maximum election time variance. The election is scheduled using both
-     * the Election Timeout and Variance
+     * Returns the maximum election time variance. The election is scheduled using both the election timeout and variance.
      *
-     * @return int
+     * @return the election time variance.
      */
     int getElectionTimeVariance();
 
     /**
-     * The size (in bytes) of the snapshot chunk sent from Leader
+     * Returns the maximum size (in bytes) for the snapshot chunk sent from a Leader.
+     *
+     * @return the maximum size (in bytes).
      */
     int getSnapshotChunkSize();
 
     /**
-     * The number of journal log entries to batch on recovery before applying.
+     * Returns the maximum number of journal log entries to batch on recovery before applying.
+     *
+     * @return the maximum number of journal log entries.
      */
     int getJournalRecoveryLogBatchSize();
 
     /**
-     * The interval in which the leader needs to check itself if its isolated
-     * @return FiniteDuration
+     * Returns the interval in which the leader needs to check if its isolated.
+     *
+     * @return the interval in ms.
      */
     long getIsolatedCheckIntervalInMillis();
 
 
     /**
-     * The multiplication factor to be used to determine shard election timeout. The election timeout
-     * is determined by multiplying the election timeout factor with the heartbeat duration.
+     * Returns the multiplication factor to be used to determine the shard election timeout. The election timeout
+     * is determined by multiplying the election timeout factor with the heart beat duration.
+     *
+     * @return the election timeout factor.
      */
     long getElectionTimeoutFactor();
 
 
     /**
+     * Returns the RaftPolicy used to determine certain Raft behaviors.
      *
-     * @return An instance of org.opendaylight.controller.cluster.raft.policy.RaftPolicy or an instance of the
-     * DefaultRaftPolicy
+     * @return an instance of org.opendaylight.controller.cluster.raft.policy.RaftPolicy, if set,  or an instance of the
+     * DefaultRaftPolicy.
      */
+    @Nonnull
     RaftPolicy getRaftPolicy();
 
     /**
      * Returns the PeerAddressResolver.
+     *
+     * @return the PeerAddressResolver instance.
      */
-    @Nonnull PeerAddressResolver getPeerAddressResolver();
+    @Nonnull
+    PeerAddressResolver getPeerAddressResolver();
 
     /**
-     * @return the RaftPolicy class used by this configuration
+     * Returns the custom RaftPolicy class name.
+     *
+     * @return the RaftPolicy class name or null if none set.
      */
     String getCustomRaftPolicyImplementationClass();
 
index f5f410c..6faf5df 100644 (file)
@@ -174,8 +174,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
                 String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
                 LOG.info("Trying to use custom RaftPolicy {}", className);
                 Class<?> c = Class.forName(className);
-                RaftPolicy obj = (RaftPolicy)c.newInstance();
-                return obj;
+                return (RaftPolicy)c.newInstance();
             } catch (Exception e) {
                 if(LOG.isDebugEnabled()) {
                     LOG.error("Could not create custom raft policy, will stick with default", e);
index 9f0d02e..ff79004 100644 (file)
@@ -8,47 +8,48 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import javax.annotation.Nullable;
+
 /**
  * ElectionTerm contains information about a RaftActors election term.
  * <p>
  * This information includes the last known current term of the RaftActor
- * and which peer was voted for by the RaftActor in that term
+ * and which candidate was voted for by the RaftActor in that term.
  * <p>
- * This class ensures that election term information is persisted
+ * This class ensures that election term information is persisted.
  */
 public interface ElectionTerm {
     /**
-     * latest term server has seen (initialized to 0
-     * on first boot, increases monotonically)
+     * Returns the current leader's Raft term.
+     *
+     * @return the current leader's Raft term.
      */
     long getCurrentTerm();
 
     /**
-     * candidateId that received vote in current
-     * term (or null if none)
+     * Returns the id of the candidate that this server voted for in current term.
+     *
+     * @return candidate id that received the vote or null if no candidate was voted for.
      */
+    @Nullable
     String getVotedFor();
 
     /**
-     * To be called mainly when we are recovering in-memory election state from
-     * persistent storage
+     * This method updates the in-memory election term state. This method should be called when recovering election
+     * state from persistent storage.
      *
-     * @param currentTerm
-     * @param votedFor
+     * @param term the election term.
+     * @param votedFor the candidate id that was voted for.
      */
-    void update(long currentTerm, String votedFor);
+    void update(long term, @Nullable String votedFor);
 
     /**
-     * To be called when we need to update the current term either because we
-     * received a message from someone with a more up-to-date term or because we
-     * just voted for someone
-     * <p>
-     * This information needs to be persisted so that on recovery the replica
-     * can start itself in the right term and know if it has already voted in
-     * that term or not
+     * This method updates the in-memory election term state and persists it so it can be recovered on next restart.
+     * This method should be called when starting a new election or when a Raft RPC message is received  with a higher
+     * term.
      *
-     * @param currentTerm
-     * @param votedFor
+     * @param term the election term.
+     * @param votedFor the candidate id that was voted for.
      */
-    void updateAndPersist(long currentTerm, String votedFor);
+    void updateAndPersist(long term, @Nullable String votedFor);
 }
index e44247d..e123a64 100644 (file)
@@ -15,9 +15,6 @@ import org.slf4j.Logger;
  * Implementation of ElectionTerm for the RaftActor.
  */
 class ElectionTermImpl implements ElectionTerm {
-    /**
-     * Identifier of the actor whose election term information this is
-     */
     private long currentTerm = 0;
     private String votedFor = null;
 
@@ -42,18 +39,17 @@ class ElectionTermImpl implements ElectionTerm {
         return votedFor;
     }
 
-    @Override public void update(long currentTerm, String votedFor) {
-        if(log.isDebugEnabled()) {
-            log.debug("{}: Set currentTerm={}, votedFor={}", logId, currentTerm, votedFor);
-        }
-        this.currentTerm = currentTerm;
-        this.votedFor = votedFor;
+    @Override
+    public void update(long newTerm, String newVotedFor) {
+        log.debug("{}: Set currentTerm={}, votedFor={}", logId, newTerm, newVotedFor);
+        this.currentTerm = newTerm;
+        this.votedFor = newVotedFor;
     }
 
     @Override
-    public void updateAndPersist(long currentTerm, String votedFor){
-        update(currentTerm, votedFor);
+    public void updateAndPersist(long newTerm, String newVotedFor) {
+        update(newTerm, newVotedFor);
         // FIXME : Maybe first persist then update the state
         persistence.persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), NoopProcedure.instance());
     }
-}
\ No newline at end of file
+}
index c0855c7..d40589a 100644 (file)
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.annotations.VisibleForTesting;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
 
 /**
- * The state of the followers log as known by the Leader
+ * The state of the followers log as known by the Leader.
  */
 public interface FollowerLogInformation {
 
     /**
-     * Increment the value of the nextIndex
+     * Increments the value of the follower's next index.
      *
-     * @return the new value of nextIndex
+     * @return the new value of nextIndex.
      */
     long incrNextIndex();
 
     /**
-     * Decrement the value of the nextIndex
+     * Decrements the value of the follower's next index.
      *
-     * @return the new value of nextIndex
+     * @return the new value of nextIndex,
      */
     long decrNextIndex();
 
     /**
-     * Sets the index of the next log entry for this follower.
+     * Sets the index of the follower's next log entry.
      *
-     * @param nextIndex
+     * @param nextIndex the new index.
      * @return true if the new index differed from the current index and the current index was updated, false
      *              otherwise.
      */
     boolean setNextIndex(long nextIndex);
 
     /**
-     * Increment the value of the matchIndex
+     * Increments the value of the follower's match index.
      *
-     * @return the new value of matchIndex
+     * @return the new value of matchIndex.
      */
     long incrMatchIndex();
 
     /**
-     * Sets the index of the highest log entry for this follower.
+     * Sets the index of the follower's highest log entry.
      *
-     * @param matchIndex
+     * @param matchIndex the new index.
      * @return true if the new index differed from the current index and the current index was updated, false
      *              otherwise.
      */
     boolean setMatchIndex(long matchIndex);
 
     /**
+     * Returns the identifier of the follower.
      *
-     * @return the identifier of the follower. This could simply be the url of the remote actor.
+     * @return the identifier of the follower.
      */
     String getId();
 
     /**
-     * @return index of the next log entry to send to that server (initialized to leader last log index + 1)
+     * Returns the index of the next log entry to send to the follower.
+     *
+     * @return index of the follower's next log entry.
      */
     long getNextIndex();
 
     /**
-     * @return index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)
+     * Returns the index of highest log entry known to be replicated on the follower.
+     *
+     * @return the index of highest log entry.
      */
     long getMatchIndex();
 
     /**
-     * Checks if the follower is active by comparing the last updated with the duration
+     * Checks if the follower is active by comparing the time of the last activity with the election time out. The
+     * follower is active if some activity has occurred for the follower within the election time out interval.
      *
-     * @return true if follower is active, false otherwise
+     * @return true if follower is active, false otherwise.
      */
     boolean isFollowerActive();
 
     /**
-     * restarts the timeout clock of the follower
+     * Marks the follower as active. This should be called when some activity has occurred for the follower.
      */
     void markFollowerActive();
 
     /**
-     * This will stop the timeout clock
+     * Marks the follower as inactive. This should only be called from unit tests.
      */
+    @VisibleForTesting
     void markFollowerInActive();
 
 
     /**
-     * This will return the active time of follower, since it was last reset
+     * Returns the time since the last activity occurred for the follower.
      *
-     * @return time in milliseconds since the last activity from the follower
+     * @return time in milliseconds since the last activity from the follower.
      */
     long timeSinceLastActivity();
 
     /**
-     * This method checks if it is ok to replicate
+     * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid
+     * sending duplicate message too frequently if the last replicate message was sent and no reply has been received
+     * yet within the current heart beat interval
      *
      * @return true if it is ok to replicate, false otherwise
      */
     boolean okToReplicate();
 
     /**
-     * @return the payload data version of the follower.
+     * Returns the log entry payload data version of the follower.
+     *
+     * @return the payload data version.
      */
     short getPayloadVersion();
 
     /**
      * Sets the payload data version of the follower.
+     *
+     * @param payloadVersion the payload data version.
      */
     void setPayloadVersion(short payloadVersion);
 
     /**
+     * Returns the the raft version of the follower.
+     *
      * @return the raft version of the follower.
      */
     short getRaftVersion();
 
     /**
      * Sets the raft version of the follower.
+     *
+     * @param raftVersion the raft version.
      */
-    void setRaftVersion(short payloadVersion);
+    void setRaftVersion(short raftVersion);
 
     /**
      * Returns the LeaderInstallSnapshotState for the in progress install snapshot.
index 9d58288..f101635 100644 (file)
@@ -15,6 +15,12 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
 
+/**
+ * Implementation of the FollowerLogInformation interface.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
 public class FollowerLogInformationImpl implements FollowerLogInformation {
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
 
@@ -40,6 +46,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private LeaderInstallSnapshotState installSnapshotState;
 
+    /**
+     * Constructs an instance.
+     *
+     * @param peerInfo the associated PeerInfo of the follower.
+     * @param matchIndex the initial match index.
+     * @param context the RaftActorContext.
+     */
     public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
         this.nextIndex = context.getCommitIndex();
         this.matchIndex = matchIndex;
@@ -59,7 +72,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean setNextIndex(long nextIndex) {
-        if(this.nextIndex != nextIndex) {
+        if (this.nextIndex != nextIndex) {
             this.nextIndex = nextIndex;
             return true;
         }
@@ -68,13 +81,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
     }
 
     @Override
-    public long incrMatchIndex(){
+    public long incrMatchIndex() {
         return matchIndex++;
     }
 
     @Override
     public boolean setMatchIndex(long matchIndex) {
-        if(this.matchIndex != matchIndex) {
+        if (this.matchIndex != matchIndex) {
             this.matchIndex = matchIndex;
             return true;
         }
@@ -99,13 +112,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean isFollowerActive() {
-        if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+        if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
             return false;
         }
 
         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-        return (stopwatch.isRunning()) &&
-                (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
+        return stopwatch.isRunning()
+                && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis();
     }
 
     @Override
@@ -130,16 +143,14 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean okToReplicate() {
-        if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+        if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
             return false;
         }
 
         // Return false if we are trying to send duplicate data before the heartbeat interval
-        if(getNextIndex() == lastReplicatedIndex){
-            if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
-                    .getHeartBeatInterval().toMillis()){
-                return false;
-            }
+        if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+                < context.getConfigParams().getHeartBeatInterval().toMillis()) {
+            return false;
         }
 
         resetLastReplicated();
@@ -148,7 +159,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private void resetLastReplicated(){
         lastReplicatedIndex = getNextIndex();
-        if(lastReplicatedStopwatch.isRunning()){
+        if (lastReplicatedStopwatch.isRunning()) {
             lastReplicatedStopwatch.reset();
         }
         lastReplicatedStopwatch.start();
@@ -182,7 +193,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) {
-        if(this.installSnapshotState == null) {
+        if (this.installSnapshotState == null) {
             this.installSnapshotState = Preconditions.checkNotNull(state);
         }
     }
index 73d9d5f..3aac07f 100644 (file)
@@ -41,7 +41,7 @@ class GetSnapshotReplyActor extends UntypedActor {
 
     @Override
     public void onReceive(Object message) {
-        if(message instanceof CaptureSnapshotReply) {
+        if (message instanceof CaptureSnapshotReply) {
             Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(),
                     params.captureSnapshot.getUnAppliedEntries(),
                     params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(),
@@ -51,7 +51,8 @@ class GetSnapshotReplyActor extends UntypedActor {
 
             LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot);
 
-            params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)), getSelf());
+            params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)),
+                    getSelf());
             getSelf().tell(PoisonPill.getInstance(), getSelf());
         } else if (message instanceof ReceiveTimeout) {
             LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms",
index 2760b48..56cda03 100644 (file)
@@ -32,12 +32,12 @@ public class ImmutableElectionTerm implements ElectionTerm {
     }
 
     @Override
-    public void update(long currentTerm, String votedFor) {
+    public void update(long newTerm, String newVotedFor) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public void updateAndPersist(long currentTerm, String votedFor) {
+    public void updateAndPersist(long newTerm, String newVotedFor) {
         throw new UnsupportedOperationException();
     }
 
index c1267fa..d494c88 100644 (file)
@@ -13,6 +13,8 @@ import akka.japi.Procedure;
  * An akka Procedure that does nothing.
  *
  * @author Thomas Pantelis
+ *
+ * @param <T> the Procedure type
  */
 public class NoopProcedure<T> implements Procedure<T> {
 
@@ -28,5 +30,6 @@ public class NoopProcedure<T> implements Procedure<T> {
 
     @Override
     public void apply(Object notUsed) {
+        // nothing to do
     }
 }
index 3d15d88..21b56d9 100644 (file)
@@ -17,6 +17,13 @@ public class PeerInfo {
     private String address;
     private VotingState votingState;
 
+    /**
+     * Constructs an instance.
+     *
+     * @param id the id of the peer.
+     * @param address the address of the peer.
+     * @param votingState the VotingState of the peer.
+     */
     public PeerInfo(String id, String address, VotingState votingState) {
         this.id = id;
         this.address = address;
index 57eb264..cb07e59 100644 (file)
@@ -53,8 +53,6 @@ import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
@@ -100,11 +98,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    protected final Logger LOG = LoggerFactory.getLogger(getClass());
-
     /**
      * This context should NOT be passed directly to any other actor it is
-     * only to be consumed by the RaftActorBehaviors
+     * only to be consumed by the RaftActorBehaviors.
      */
     private final RaftActorContextImpl context;
 
@@ -124,7 +120,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private boolean shuttingDown;
 
-    public RaftActor(String id, Map<String, String> peerAddresses,
+    protected RaftActor(String id, Map<String, String> peerAddresses,
          Optional<ConfigParams> configParams, short payloadVersion) {
 
         persistentProvider = new PersistentDataProvider(this);
@@ -133,7 +129,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
-            configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(),
+            configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
             delegatingPersistenceProvider, LOG);
 
         context.setPayloadVersion(payloadVersion);
@@ -159,12 +155,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     protected void handleRecover(Object message) {
-        if(raftRecovery == null) {
+        if (raftRecovery == null) {
             raftRecovery = newRaftActorRecoverySupport();
         }
 
         boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
-        if(recoveryComplete) {
+        if (recoveryComplete) {
             onRecoveryComplete();
 
             initializeBehavior();
@@ -178,7 +174,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     @VisibleForTesting
-    void initializeBehavior(){
+    void initializeBehavior() {
         changeCurrentBehavior(new Follower(context));
     }
 
@@ -229,7 +225,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             long startTime = System.nanoTime();
 
-            if(LOG.isDebugEnabled()) {
+            if (LOG.isDebugEnabled()) {
                 LOG.debug("{}: Applying state for log index {} data {}",
                     persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
                     applyState.getReplicatedLogEntry().getData());
@@ -241,7 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             }
 
             long elapsedTime = System.nanoTime() - startTime;
-            if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
+            if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
                 LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
                         TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
             }
@@ -261,9 +257,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         } else if (message instanceof ApplyJournalEntries) {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
-            }
+            LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
             persistence().persist(applyEntries, NoopProcedure.instance());
 
@@ -272,19 +266,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 new FindLeaderReply(getLeaderAddress()),
                 getSelf()
             );
-        } else if(message instanceof GetOnDemandRaftState) {
+        } else if (message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
-        } else if(message instanceof InitiateCaptureSnapshot) {
+        } else if (message instanceof InitiateCaptureSnapshot) {
             captureSnapshot();
-        } else if(message instanceof SwitchBehavior) {
+        } else if (message instanceof SwitchBehavior) {
             switchBehavior((SwitchBehavior) message);
-        } else if(message instanceof LeaderTransitioning) {
+        } else if (message instanceof LeaderTransitioning) {
             onLeaderTransitioning();
-        } else if(message instanceof Shutdown) {
+        } else if (message instanceof Shutdown) {
             onShutDown();
-        } else if(message instanceof Runnable) {
+        } else if (message instanceof Runnable) {
             ((Runnable)message).run();
-        } else if(message instanceof NoopPayload) {
+        } else if (message instanceof NoopPayload) {
             persistData(null, null, (NoopPayload)message);
         } else if (!possiblyHandleBehaviorMessage(message)) {
             handleNonRaftCommand(message);
@@ -310,7 +304,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
-        if(leadershipTransferInProgress == null) {
+        if (leadershipTransferInProgress == null) {
             leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
@@ -335,7 +329,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void onShutDown() {
         LOG.debug("{}: onShutDown", persistenceId());
 
-        if(shuttingDown) {
+        if (shuttingDown) {
             return;
         }
 
@@ -380,16 +374,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void onLeaderTransitioning() {
         LOG.debug("{}: onLeaderTransitioning", persistenceId());
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
     }
 
     private void switchBehavior(SwitchBehavior message) {
-        if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
+        if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
             RaftState newState = message.getNewState();
-            if( newState == RaftState.Leader || newState == RaftState.Follower) {
+            if ( newState == RaftState.Leader || newState == RaftState.Follower) {
                 switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
                     AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
                 getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
@@ -414,7 +408,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         Map<String, String> peerAddresses = new HashMap<>();
         Map<String, Boolean> peerVotingStates = new HashMap<>();
-        for(PeerInfo info: context.getPeers()) {
+        for (PeerInfo info: context.getPeers()) {
             peerVotingStates.put(info.getId(), info.isVoting());
             peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
         }
@@ -446,11 +440,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             builder.lastLogTerm(lastLogEntry.getTerm());
         }
 
-        if(getCurrentBehavior() instanceof AbstractLeader) {
+        if (getCurrentBehavior() instanceof AbstractLeader) {
             AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
             Collection<String> followerIds = leader.getFollowerIds();
             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
-            for(String id: followerIds) {
+            for (String id: followerIds) {
                 final FollowerLogInformation info = leader.getFollower(id);
                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
                         info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
@@ -477,24 +471,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         // it can happen that the state has not changed but the leader has changed.
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if(!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) ||
-           oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
-            if(roleChangeNotifier.isPresent()) {
+        if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
+                || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
+            if (roleChangeNotifier.isPresent()) {
                 roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
                         currentBehavior.getLeaderPayloadVersion()), getSelf());
             }
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
 
-            if(leadershipTransferInProgress != null) {
+            if (leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
 
             serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
         }
 
-        if (roleChangeNotifier.isPresent() &&
-                (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
+        if (roleChangeNotifier.isPresent()
+                && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
                     currentBehavior.state().name()), getSelf());
         }
@@ -508,19 +502,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     public long snapshotSequenceNr() {
         // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
         // so that we can delete the persistent journal based on the saved sequence-number
-        // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot
-        // was saved and not the number we saved.
-        // We would want to override it , by asking akka to use the last-sequence number known to us.
+        // However , when akka replays the journal during recovery, it replays it from the sequence number when the
+        // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the
+        // last-sequence number known to us.
         return context.getSnapshotManager().getLastSequenceNumber();
     }
 
     /**
      * When a derived RaftActor needs to persist something it must call
      * persistData.
-     *
-     * @param clientActor
-     * @param identifier
-     * @param data
      */
     protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
 
@@ -528,14 +518,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
-        }
+        LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
 
         final RaftActorContext raftContext = getRaftActorContext();
 
         replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
-            if (!hasFollowers()){
+            if (!hasFollowers()) {
                 // Increment the Commit Index and the Last Applied values
                 raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
                 raftContext.setLastApplied(replicatedLogEntry1.getIndex());
@@ -576,7 +564,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     /**
      * Derived actors can call the isLeader method to check if the current
-     * RaftActor is the Leader or not
+     * RaftActor is the Leader or not.
      *
      * @return true it this RaftActor is a Leader false otherwise
      */
@@ -585,8 +573,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected final boolean isLeaderActive() {
-        return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader &&
-                !shuttingDown && !isLeadershipTransferInProgress();
+        return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader
+                && !shuttingDown && !isLeadershipTransferInProgress();
     }
 
     private boolean isLeadershipTransferInProgress() {
@@ -600,10 +588,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @return A reference to the leader if known, null otherwise
      */
-    public ActorSelection getLeader(){
+    public ActorSelection getLeader() {
         String leaderAddress = getLeaderAddress();
 
-        if(leaderAddress == null){
+        if (leaderAddress == null) {
             return null;
         }
 
@@ -611,10 +599,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
+     * Returns the id of the current leader.
      *
      * @return the current leader's id
      */
-    protected final String getLeaderId(){
+    protected final String getLeaderId() {
         return getCurrentBehavior().getLeaderId();
     }
 
@@ -623,7 +612,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return getCurrentBehavior().state();
     }
 
-    protected Long getCurrentTerm(){
+    protected Long getCurrentTerm() {
         return context.getTermInformation().getCurrentTerm();
     }
 
@@ -634,10 +623,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected void updateConfigParams(ConfigParams configParams) {
 
         // obtain the RaftPolicy for oldConfigParams and the updated one.
-        String oldRaftPolicy = context.getConfigParams().
-            getCustomRaftPolicyImplementationClass();
-        String newRaftPolicy = configParams.
-            getCustomRaftPolicyImplementationClass();
+        String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
+        String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass();
 
         LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
             oldRaftPolicy, newRaftPolicy);
@@ -651,7 +638,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 String previousLeaderId = behavior.getLeaderId();
                 short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
 
-                LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
+                LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(),
+                        previousLeaderId);
 
                 changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
             } else {
@@ -670,14 +658,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected void setPersistence(boolean persistent) {
         DataPersistenceProvider currentPersistence = persistence();
-        if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
+        if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
             setPersistence(new PersistentDataProvider(this));
 
-            if(getCurrentBehavior() != null) {
+            if (getCurrentBehavior() != null) {
                 LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
                 captureSnapshot();
             }
-        } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
+        } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
             setPersistence(new NonPersistentDataProvider() {
                 /**
                  * The way snapshotting works is,
@@ -691,7 +679,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                  * </ol>
                  */
                 @Override
-                public void saveSnapshot(Object o) {
+                public void saveSnapshot(Object object) {
                     // Make saving Snapshot successful
                     // Committing the snapshot here would end up calling commit in the creating state which would
                     // be a state violation. That's why now we send a message to commit the snapshot.
@@ -703,25 +691,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     /**
      * setPeerAddress sets the address of a known peer at a later time.
+     *
      * <p>
      * This is to account for situations where a we know that a peer
      * exists but we do not know an address up-front. This may also be used in
      * situations where a known peer starts off in a different location and we
      * need to change it's address
+     *
      * <p>
      * Note that if the peerId does not match the list of peers passed to
      * this actor during construction an IllegalStateException will be thrown.
-     *
-     * @param peerId
-     * @param peerAddress
      */
-    protected void setPeerAddress(String peerId, String peerAddress){
+    protected void setPeerAddress(String peerId, String peerAddress) {
         context.setPeerAddress(peerId, peerAddress);
     }
 
     /**
      * The applyState method will be called by the RaftActor when some data
-     * needs to be applied to the actor's state
+     * needs to be applied to the actor's state.
      *
      * @param clientActor A reference to the client who sent this message. This
      *                    is the same reference that was passed to persistData
@@ -763,7 +750,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onStateChanged();
 
     /**
-     * Notifier Actor for this RaftActor to notify when a role change happens
+     * Notifier Actor for this RaftActor to notify when a role change happens.
+     *
      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
      */
     protected abstract Optional<ActorRef> getRoleChangeNotifier();
@@ -774,6 +762,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * work prior to performing the operation. On completion of any work, the run method must be called on the
      * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
      * this actor's thread dispatcher as as it modifies internal state.
+     *
      * <p>
      * The default implementation immediately runs the operation.
      *
@@ -784,11 +773,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void onLeaderChanged(String oldLeader, String newLeader) {
-
     }
 
-    private String getLeaderAddress(){
-        if(isLeader()){
+    private String getLeaderAddress() {
+        if (isLeader()) {
             return getSelf().path().toString();
         }
         String leaderId = getLeaderId();
@@ -796,15 +784,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             return null;
         }
         String peerAddress = context.getPeerAddress(leaderId);
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
-                    persistenceId(), leaderId, peerAddress);
-        }
+        LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
 
         return peerAddress;
     }
 
-    protected boolean hasFollowers(){
+    protected boolean hasFollowers() {
         return getRaftActorContext().hasFollowers();
     }
 
@@ -853,10 +838,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     /**
      * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
      */
-    private static abstract class BehaviorState implements Immutable {
+    private abstract static class BehaviorState implements Immutable {
         @Nullable abstract RaftActorBehavior getBehavior();
+
         @Nullable abstract String getLastValidLeaderId();
+
         @Nullable abstract String getLastLeaderId();
+
         @Nullable abstract short getLeaderPayloadVersion();
     }
 
index ba6645f..f9f9478 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.function.LongSupplier;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -31,29 +32,32 @@ import org.slf4j.Logger;
  */
 public interface RaftActorContext {
     /**
-     * Create a new local actor
-     * @param props
-     * @return a reference to the newly created actor
+     * Creates a new local actor.
+     *
+     * @param props the Props used to create the actor.
+     * @return a reference to the newly created actor.
      */
     ActorRef actorOf(Props props);
 
     /**
-     * Create a actor selection
-     * @param path
-     * @return an actor selection for the given actor path
+     * Creates an actor selection.
+     *
+     * @param path the path.
+     * @return an actor selection for the given actor path.
      */
     ActorSelection actorSelection(String path);
 
     /**
-     * Get the identifier for the RaftActor. This identifier represents the
-     * name of the actor whose common state is being shared. For example the
-     * id could be 'inventory'
+     * Returns the identifier for the RaftActor. This identifier represents the
+     * name of the actor whose common state is being shared.
      *
      * @return the identifier
      */
     String getId();
 
     /**
+     * Returns the reference to the RaftActor.
+     *
      * @return A reference to the RaftActor itself. This could be used to send messages
      * to the RaftActor
      */
@@ -62,204 +66,255 @@ public interface RaftActorContext {
     /**
      * The akka Cluster singleton for the actor system if one is configured.
      *
-     * @return an Optional containing the CLuster instance is present.
+     * @return an Optional containing the Cluster instance is present.
      */
     Optional<Cluster> getCluster();
 
     /**
-     * @return the ElectionTerm information
+     * Returns the current ElectionTerm information.
+     *
+     * @return the ElectionTerm.
      */
+    @Nonnull
     ElectionTerm getTermInformation();
 
     /**
-     * @return index of highest log entry known to be committed (initialized to 0, increases monotonically)
+     * Returns the index of highest log entry known to be committed.
+     *
+     * @return index of highest log entry known to be committed.
      */
     long getCommitIndex();
 
 
     /**
+     * Sets the index of highest log entry known to be committed.
+     *
      * @param commitIndex new commit index
      */
     void setCommitIndex(long commitIndex);
 
     /**
-     * @return index of highest log entry applied to state machine (initialized to 0, increases monotonically)
+     * Returns index of highest log entry applied to state machine.
+     *
+     * @return index of highest log entry applied to state machine.
      */
     long getLastApplied();
 
-
     /**
-     * @param lastApplied the index of the last log entry that was applied to the state
+     * Sets index of highest log entry applied to state machine.
+     *
+     * @param lastApplied the new applied index.
      */
     void setLastApplied(long lastApplied);
 
     /**
+     * Sets the ReplicatedLog instance.
      *
-     * @param replicatedLog the replicated log of the current RaftActor
+     * @param replicatedLog the ReplicatedLog instance.
      */
-    void setReplicatedLog(ReplicatedLog replicatedLog);
+    void setReplicatedLog(@Nonnull ReplicatedLog replicatedLog);
 
     /**
-     * @return A representation of the log
+     * Returns the ReplicatedLog instance.
+     *
+     * @return the ReplicatedLog instance.
      */
+    @Nonnull
     ReplicatedLog getReplicatedLog();
 
     /**
-     * @return The ActorSystem associated with this context
+     * Returns the The ActorSystem associated with this context.
+     *
+     * @return the ActorSystem.
      */
+    @Nonnull
     ActorSystem getActorSystem();
 
     /**
-     * @return the logger to be used for logging messages to a log file
+     * Returns the logger to be used for logging messages.
+     *
+     * @return the logger.
      */
+    @Nonnull
     Logger getLogger();
 
     /**
-     * Get the address of the peer as a String. This is the same format in
-     * which a consumer would provide the address
+     * Gets the address of a peer as a String. This is the same format in which a consumer would provide the address.
      *
-     * @param peerId
-     * @return The address of the peer or null if the address has not yet been
-     *         resolved
+     * @param peerId the id of the peer.
+     * @return the address of the peer or null if the address has not yet been resolved.
      */
+    @Nullable
     String getPeerAddress(String peerId);
 
     /**
-     * @param serverCfgPayload
+     * Updates the peers and information to match the given ServerConfigurationPayload.
+     *
+     * @param serverCfgPayload the ServerConfigurationPayload.
      */
     void updatePeerIds(ServerConfigurationPayload serverCfgPayload);
 
     /**
+     * Returns the PeerInfo instances for each peer.
+     *
      * @return list of PeerInfo
      */
+    @Nonnull
     Collection<PeerInfo> getPeers();
 
     /**
-     * @return the list of peer IDs.
+     * Returns the id's for each peer.
+     *
+     * @return the list of peer id's.
      */
+    @Nonnull
     Collection<String> getPeerIds();
 
     /**
-     * Get the PeerInfo for the given peer.
+     * Returns the PeerInfo for the given peer.
      *
      * @param peerId
-     * @return the PeerInfo
+     * @return the PeerInfo or null if not found.
      */
+    @Nullable
     PeerInfo getPeerInfo(String peerId);
 
     /**
-     * Add to actor peers
+     * Adds a new peer.
      *
-     * @param name
-     * @param address
+     * @param id the id of the new peer.
+     * @param address the address of the new peer.
+     * @param votingState the VotingState of the new peer.
      */
-    void addToPeers(String name, String address, VotingState votingState);
+    void addToPeers(String id, String address, VotingState votingState);
 
     /**
+     * Removes a peer.
      *
-     * @param name
+     * @param id the id of the peer to remove.
      */
-    void removePeer(String name);
+    void removePeer(String id);
 
     /**
-     * Given a peerId return the corresponding actor
-     * <p>
+     * Returns an ActorSelection for a peer.
      *
-     *
-     * @param peerId
-     * @return The actorSelection corresponding to the peer or null if the
-     *         address has not yet been resolved
+     * @param peerId the id of the peer.
+     * @return the actorSelection corresponding to the peer or null if the address has not yet been resolved.
      */
+    @Nullable
     ActorSelection getPeerActorSelection(String peerId);
 
     /**
-     * Set Peer Address can be called at a later time to change the address of
-     * a known peer.
+     * Sets the address of a peer.
      *
-     * <p>
-     * Throws an IllegalStateException if the peer is unknown
-     *
-     * @param peerId
-     * @param peerAddress
+     * @param peerId the id of the peer.
+     * @param peerAddress the address of the peer.
      */
     void setPeerAddress(String peerId, String peerAddress);
 
     /**
-     * @return ConfigParams
+     * Returns the ConfigParams instance.
+     *
+     * @return the ConfigParams instance.
      */
+    @Nonnull
     ConfigParams getConfigParams();
 
     /**
+     * Returns the SnapshotManager instance.
      *
-     * @return the SnapshotManager for this RaftActor
+     * @return the SnapshotManager instance.
      */
+    @Nonnull
     SnapshotManager getSnapshotManager();
 
     /**
+     * Returns the DataPersistenceProvider instance.
      *
-     * @return the DataPersistenceProvider for this RaftActor
+     * @return the DataPersistenceProvider instance.
      */
+    @Nonnull
     DataPersistenceProvider getPersistenceProvider();
 
     /**
+     * Determines if there are any peer followers.
      *
-     * @return true if the RaftActor has followers else false
+     * @return true if there are followers otherwise false.
      */
     boolean hasFollowers();
 
     /**
+     * Returns the total available memory for use in calculations. Normally this returns JVM's max memory but can be
+     * overridden for unit tests.
      *
-     * @return the total memory used by the ReplicatedLog
+     * @return the total memory.
      */
     long getTotalMemory();
 
     /**
+     * Sets the retriever of the total memory metric.
      *
-     * @param retriever a supplier of the total memory metric
+     * @param retriever a supplier of the total memory metric.
      */
     @VisibleForTesting
     void setTotalMemoryRetriever(LongSupplier retriever);
 
     /**
+     * Returns the payload version to be used when replicating data.
      *
-     * @return the payload version to be used when replicating data
+     * @return the payload version.
      */
     short getPayloadVersion();
 
     /**
-     * @return an implementation of the RaftPolicy so that the Raft code can be adapted
+     * Returns the RaftPolicy used to determine certain Raft behaviors.
+     *
+     * @return the RaftPolicy instance.
      */
+    @Nonnull
     RaftPolicy getRaftPolicy();
 
     /**
-     * @return true if there are any dynamic server configuration changes available,
-     *  false if static peer configurations are still in use
+     * Determines if there have been any dynamic server configuration changes applied.
+     *
+     * @return true if dynamic server configuration changes have been applied, false otherwise, meaning that static
+     *         peer configuration is still in use.
      */
     boolean isDynamicServerConfigurationInUse();
 
     /**
-     * Configures the dynamic server configurations are avaialble for the RaftActor
+     * Sets that dynamic server configuration changes have been applied.
      */
     void setDynamicServerConfigurationInUse();
 
     /**
-     * @return the RaftActor's peer information as a ServerConfigurationPayload if the
-     * dynamic server configurations are available, otherwise returns null
+     * Returns the peer information as a ServerConfigurationPayload if dynamic server configurations have been applied.
+     *
+     * @param includeSelf include this peer's info.
+     * @return the peer information as a ServerConfigurationPayload or null if no dynamic server configurations have
+     *         been applied.
      */
-    @Nullable ServerConfigurationPayload getPeerServerInfo(boolean includeSelf);
+    @Nullable
+    ServerConfigurationPayload getPeerServerInfo(boolean includeSelf);
 
     /**
-     * @return true if this RaftActor is a voting member of the cluster, false otherwise.
+     * Determines if this peer is a voting member of the cluster.
+     *
+     * @return true if this peer is a voting member, false otherwise.
      */
     boolean isVotingMember();
 
     /**
+     * Determines if there are any voting peers.
+     *
      * @return true if there are any voting peers, false otherwise.
      */
     boolean anyVotingPeers();
 
     /**
-     * @return current behavior attached to the raft actor.
+     * Returns the current behavior attached to the RaftActor.
+     *
+     * @return current behavior.
      */
     RaftActorBehavior getCurrentBehavior();
 }
index 926e974..65567d3 100644 (file)
@@ -32,6 +32,12 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
 
+/**
+ * Implementation of the RaftActorContext interface.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
 public class RaftActorContextImpl implements RaftActorContext {
     private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
 
@@ -51,7 +57,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
 
-    private final Logger LOG;
+    private final Logger log;
 
     private ConfigParams configParams;
 
@@ -87,7 +93,7 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.lastApplied = lastApplied;
         this.configParams = configParams;
         this.persistenceProvider = persistenceProvider;
-        this.LOG = logger;
+        this.log = logger;
 
         for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
@@ -135,7 +141,7 @@ public class RaftActorContextImpl implements RaftActorContext {
                 cluster = Optional.of(Cluster.get(getActorSystem()));
             } catch(Exception e) {
                 // An exception means there's no cluster configured. This will only happen in unit tests.
-                LOG.debug("{}: Could not obtain Cluster: {}", getId(), e);
+                log.debug("{}: Could not obtain Cluster: {}", getId(), e);
                 cluster = Optional.empty();
             }
         }
@@ -182,7 +188,7 @@ public class RaftActorContextImpl implements RaftActorContext {
     }
 
     @Override public Logger getLogger() {
-        return this.LOG;
+        return this.log;
     }
 
     @Override
@@ -202,7 +208,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     @Override
     public String getPeerAddress(String peerId) {
-        String peerAddress = null;
+        String peerAddress;
         PeerInfo peerInfo = peerInfoMap.get(peerId);
         if(peerInfo != null) {
             peerAddress = peerInfo.getAddress();
@@ -247,7 +253,7 @@ public class RaftActorContextImpl implements RaftActorContext {
             votingMember = false;
         }
 
-        LOG.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
+        log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
 
         setDynamicServerConfigurationInUse();
     }
@@ -257,8 +263,8 @@ public class RaftActorContextImpl implements RaftActorContext {
     }
 
     @Override
-    public void addToPeers(String id, String address, VotingState votingState) {
-        peerInfoMap.put(id, new PeerInfo(id, address, votingState));
+    public void addToPeers(String peerId, String address, VotingState votingState) {
+        peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
         numVotingPeers = -1;
     }
 
@@ -284,7 +290,7 @@ public class RaftActorContextImpl implements RaftActorContext {
     public void setPeerAddress(String peerId, String peerAddress) {
         PeerInfo peerInfo = peerInfoMap.get(peerId);
         if(peerInfo != null) {
-            LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
+            log.info("Peer address for peer {} set to {}", peerId, peerAddress);
             peerInfo.setAddress(peerAddress);
         }
     }
@@ -292,7 +298,7 @@ public class RaftActorContextImpl implements RaftActorContext {
     @Override
     public SnapshotManager getSnapshotManager() {
         if(snapshotManager == null){
-            snapshotManager = new SnapshotManager(this, LOG);
+            snapshotManager = new SnapshotManager(this, log);
         }
         return snapshotManager;
     }
@@ -309,7 +315,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     @Override
     public boolean hasFollowers() {
-        return getPeerIds().size() > 0;
+        return !getPeerIds().isEmpty();
     }
 
     @Override
@@ -348,7 +354,7 @@ public class RaftActorContextImpl implements RaftActorContext {
             newConfig.add(new ServerInfo(getId(), votingMember));
         }
 
-        return (new ServerConfigurationPayload(newConfig));
+        return new ServerConfigurationPayload(newConfig);
     }
 
     @Override
@@ -384,7 +390,7 @@ public class RaftActorContextImpl implements RaftActorContext {
             try {
                 currentBehavior.close();
             } catch (Exception e) {
-                LOG.debug("{}: Error closing behavior {}", getId(), currentBehavior.state());
+                log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
             }
         }
     }
index 84e2daf..378f33a 100644 (file)
@@ -31,21 +31,21 @@ class RaftActorDelegatingPersistentDataProvider extends DelegatingPersistentData
     }
 
     @Override
-    public <T> void persist(final T o, final Procedure<T> procedure) {
-        if(getDelegate().isRecoveryApplicable()) {
-            super.persist(o, procedure);
+    public <T> void persist(final T object, final Procedure<T> procedure) {
+        if (getDelegate().isRecoveryApplicable()) {
+            super.persist(object, procedure);
         } else {
-            if(o instanceof ReplicatedLogEntry) {
-                Payload payload = ((ReplicatedLogEntry)o).getData();
-                if(payload instanceof PersistentPayload) {
+            if (object instanceof ReplicatedLogEntry) {
+                Payload payload = ((ReplicatedLogEntry)object).getData();
+                if (payload instanceof PersistentPayload) {
                     // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
                     // on recovery if data persistence is later enabled.
-                    persistentProvider.persist(payload, p -> procedure.apply(o));
+                    persistentProvider.persist(payload, p -> procedure.apply(object));
                 } else {
-                    super.persist(o, procedure);
+                    super.persist(object, procedure);
                 }
             } else {
-                super.persist(o, procedure);
+                super.persist(object, procedure);
             }
         }
     }
index 85980e2..7efc758 100644 (file)
@@ -136,12 +136,9 @@ public class RaftActorLeadershipTransferCohort {
         // safely run on the actor's thread dispatcher.
         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
-                new Runnable() {
-                    @Override
-                    public void run() {
-                        LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
-                        finish(true);
-                    }
+                (Runnable) () -> {
+                    LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+                    finish(true);
                 }, raftActor.getContext().system().dispatcher(), raftActor.self());
     }
 
index 30e27e1..c376047 100644 (file)
@@ -20,6 +20,8 @@ public interface RaftActorRecoveryCohort {
     /**
      * This method is called during recovery at the start of a batch of state entries. Derived
      * classes should perform any initialization needed to start a batch.
+     *
+     * @param maxBatchSize the maximum batch size.
      */
     void startLogRecoveryBatch(int maxBatchSize);
 
index 15d98b5..d0217a6 100644 (file)
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
  *
  * @author Thomas Pantelis
  */
+
 class RaftActorRecoverySupport {
     private final RaftActorContext context;
     private final RaftActorRecoveryCohort cohort;
@@ -50,7 +51,7 @@ class RaftActorRecoverySupport {
 
         anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
 
-        if(isMigratedSerializable(message)) {
+        if (isMigratedSerializable(message)) {
             hasMigratedDataRecovered = true;
         }
 
@@ -78,23 +79,23 @@ class RaftActorRecoverySupport {
 
     private void possiblyRestoreFromSnapshot() {
         byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
-        if(restoreFromSnapshot == null) {
+        if (restoreFromSnapshot == null) {
             return;
         }
 
-        if(anyDataRecovered) {
+        if (anyDataRecovered) {
             log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
                     context.getId());
             return;
         }
 
-        try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
+        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
             Snapshot snapshot = (Snapshot) ois.readObject();
 
             log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
 
             context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
-        } catch(Exception e) {
+        } catch (Exception e) {
             log.error("{}: Error deserializing snapshot restore", context.getId(), e);
         }
     }
@@ -104,27 +105,25 @@ class RaftActorRecoverySupport {
     }
 
     private void initRecoveryTimer() {
-        if(recoveryTimer == null) {
+        if (recoveryTimer == null) {
             recoveryTimer = Stopwatch.createStarted();
         }
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
-        if(log.isDebugEnabled()) {
-            log.debug("{}: SnapshotOffer called..", context.getId());
-        }
+        log.debug("{}: SnapshotOffer called..", context.getId());
 
         initRecoveryTimer();
 
         Snapshot snapshot = (Snapshot) offer.snapshot();
 
-        for(ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
-            if(isMigratedPayload(entry)) {
+        for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
+            if (isMigratedPayload(entry)) {
                 hasMigratedDataRecovered = true;
             }
         }
 
-        if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+        if (!context.getPersistenceProvider().isRecoveryApplicable()) {
             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
             // entries - we don't want to preserve these, only the server config and election term info.
 
@@ -149,7 +148,7 @@ class RaftActorRecoverySupport {
         if (snapshot.getServerConfiguration() != null) {
             context.updatePeerIds(snapshot.getServerConfiguration());
 
-            if(isMigratedSerializable(snapshot.getServerConfiguration())) {
+            if (isMigratedSerializable(snapshot.getServerConfiguration())) {
                 hasMigratedDataRecovered = true;
             }
         }
@@ -161,35 +160,35 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
-        if(log.isDebugEnabled()) {
+        if (log.isDebugEnabled()) {
             log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
                     logEntry.getIndex(), logEntry.size());
         }
 
-        if(isServerConfigurationPayload(logEntry)){
+        if (isServerConfigurationPayload(logEntry)) {
             context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
         }
 
-        if(isMigratedPayload(logEntry)) {
+        if (isMigratedPayload(logEntry)) {
             hasMigratedDataRecovered = true;
         }
 
-        if(context.getPersistenceProvider().isRecoveryApplicable()) {
+        if (context.getPersistenceProvider().isRecoveryApplicable()) {
             replicatedLog().append(logEntry);
-        } else if(!isPersistentPayload(logEntry)) {
+        } else if (!isPersistentPayload(logEntry)) {
             dataRecoveredWithPersistenceDisabled = true;
         }
     }
 
     private void onRecoveredApplyLogEntries(long toIndex) {
-        if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+        if (!context.getPersistenceProvider().isRecoveryApplicable()) {
             dataRecoveredWithPersistenceDisabled = true;
             return;
         }
 
         long lastUnappliedIndex = context.getLastApplied() + 1;
 
-        if(log.isDebugEnabled()) {
+        if (log.isDebugEnabled()) {
             // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
             // but the entry itself has made it to that state and recovered via the snapshot
             log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
@@ -199,7 +198,7 @@ class RaftActorRecoverySupport {
         long lastApplied = lastUnappliedIndex - 1;
         for (long i = lastUnappliedIndex; i <= toIndex; i++) {
             ReplicatedLogEntry logEntry = replicatedLog().get(i);
-            if(logEntry != null) {
+            if (logEntry != null) {
                 lastApplied++;
                 batchRecoveredLogEntry(logEntry);
             } else {
@@ -214,7 +213,7 @@ class RaftActorRecoverySupport {
     }
 
     private void onDeleteEntries(DeleteEntries deleteEntries) {
-        if(context.getPersistenceProvider().isRecoveryApplicable()) {
+        if (context.getPersistenceProvider().isRecoveryApplicable()) {
             replicatedLog().removeFrom(deleteEntries.getFromIndex());
         } else {
             dataRecoveredWithPersistenceDisabled = true;
@@ -225,14 +224,14 @@ class RaftActorRecoverySupport {
         initRecoveryTimer();
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
-        if(!isServerConfigurationPayload(logEntry)){
-            if(currentRecoveryBatchCount == 0) {
+        if (!isServerConfigurationPayload(logEntry)) {
+            if (currentRecoveryBatchCount == 0) {
                 cohort.startLogRecoveryBatch(batchSize);
             }
 
             cohort.appendRecoveredLogEntry(logEntry.getData());
 
-            if(++currentRecoveryBatchCount >= batchSize) {
+            if (++currentRecoveryBatchCount >= batchSize) {
                 endCurrentLogRecoveryBatch();
             }
         }
@@ -244,26 +243,25 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
-        if(currentRecoveryBatchCount > 0) {
+        if (currentRecoveryBatchCount > 0) {
             endCurrentLogRecoveryBatch();
         }
 
         String recoveryTime = "";
-        if(recoveryTimer != null) {
+        if (recoveryTimer != null) {
             recoveryTimer.stop();
             recoveryTime = " in " + recoveryTimer.toString();
             recoveryTimer = null;
         }
 
-        log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
-                 "Persistence Id =  " + context.getId() +
-                 " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
-                 "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+        log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + "Persistence Id =  "
+                  + context.getId() + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, "
+                  + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
                  replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
-        if(dataRecoveredWithPersistenceDisabled ||
-                hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
-            if(hasMigratedDataRecovered) {
+        if (dataRecoveredWithPersistenceDisabled
+                || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
+            if (hasMigratedDataRecovered) {
                 log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
             } else {
                 log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId());
@@ -274,14 +272,15 @@ class RaftActorRecoverySupport {
             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
             // to clean out unwanted messages.
 
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
+            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+                    -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));
 
             persistentProvider.saveSnapshot(snapshot);
 
             persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
-        } else if(hasMigratedDataRecovered) {
+        } else if (hasMigratedDataRecovered) {
             log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId());
 
             context.getSnapshotManager().capture(replicatedLog().last(), -1);
@@ -290,19 +289,19 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
+    private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry) {
         return repLogEntry.getData() instanceof ServerConfigurationPayload;
     }
 
-    private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry){
+    private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry) {
         return repLogEntry.getData() instanceof PersistentPayload;
     }
 
-    private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry){
+    private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry) {
         return isMigratedSerializable(repLogEntry.getData());
     }
 
-    private static boolean isMigratedSerializable(Object message){
+    private static boolean isMigratedSerializable(Object message) {
         return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
     }
 }
index 2fa7002..16c091a 100644 (file)
@@ -343,11 +343,11 @@ class RaftActorServerConfigurationSupport {
         }
 
         @Override
-        public void onNewOperation(ServerOperationContext<?> operationContext) {
+        public void onNewOperation(ServerOperationContext<?> newOperationContext) {
             if(timedOut) {
-                sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+                sendReply(newOperationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
             } else {
-                super.onNewOperation(operationContext);
+                super.onNewOperation(newOperationContext);
             }
         }
     }
@@ -538,7 +538,7 @@ class RaftActorServerConfigurationSupport {
      *
      * @param <T> the operation type
      */
-    private static abstract class ServerOperationContext<T> {
+    private abstract static class ServerOperationContext<T> {
         private final T operation;
         private final ActorRef clientRequestor;
         private final Identifier contextId;
index 5173dc8..4a17bec 100644 (file)
@@ -71,7 +71,8 @@ class RaftActorSnapshotMessageSupport {
     }
 
     private void onCaptureSnapshotReply(byte[] snapshotBytes) {
-        log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
+        log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(),
+                snapshotBytes.length);
 
         context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory());
     }
@@ -100,7 +101,7 @@ class RaftActorSnapshotMessageSupport {
     private void onGetSnapshot(ActorRef sender) {
         log.debug("{}: onGetSnapshot", context.getId());
 
-        if(context.getPersistenceProvider().isRecoveryApplicable()) {
+        if (context.getPersistenceProvider().isRecoveryApplicable()) {
             CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
                     context.getReplicatedLog().last(), -1, false);
 
@@ -110,7 +111,8 @@ class RaftActorSnapshotMessageSupport {
 
             cohort.createSnapshot(snapshotReplyActor);
         } else {
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
+            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+                    -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));
 
index e7f1be5..2b527db 100644 (file)
@@ -26,20 +26,28 @@ public interface ReplicatedLog {
      * @return the ReplicatedLogEntry if found, otherwise null if the adjusted index less than 0 or
      * greater than the size of the in-memory journal.
      */
-    @Nullable ReplicatedLogEntry get(long index);
+    @Nullable
+    ReplicatedLogEntry get(long index);
 
     /**
      * Return the last replicated log entry in the log or null of not found.
+     *
+     * @return the last replicated log entry in the log or null of not found.
      */
-    @Nullable ReplicatedLogEntry last();
+    @Nullable
+    ReplicatedLogEntry last();
 
     /**
      * Return the index of the last entry in the log or -1 if the log is empty.
+     *
+     * @return the index of the last entry in the log or -1 if the log is empty.
      */
     long lastIndex();
 
     /**
      * Return the term of the last entry in the log or -1 if the log is empty.
+     *
+     * @return the term of the last entry in the log or -1 if the log is empty.
      */
     long lastTerm();
 
@@ -85,6 +93,12 @@ public interface ReplicatedLog {
      */
     void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
 
+    /**
+     * Appends an entry to the in-memory log and persists it as well.
+     *
+     * @param replicatedLogEntry the entry to append
+     * @param callback the Procedure to be notified when persistence is complete.
+     */
     void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
 
     /**
@@ -182,14 +196,16 @@ public interface ReplicatedLog {
     void snapshotRollback();
 
     /**
-     * Size of the data in the log (in bytes)
+     * Returns the size of the data in the log (in bytes)
+     *
+     * @return the size of the data in the log (in bytes).
      */
     int dataSize();
 
     /**
-     * We decide if snapshot need to be captured based on the count/memory consumed.
-     * @param replicatedLogEntry
+     * Determines if a snapshot need to be captured based on the count/memory consumed.
+     *
+     * @param replicatedLogEntry the last log entry.
      */
     void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry);
-
 }
index 1bced24..a09a0a2 100644 (file)
@@ -11,30 +11,34 @@ package org.opendaylight.controller.cluster.raft;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 /**
- * Represents one entry in the replicated log
+ * Represents one entry in the replicated log.
  */
 public interface ReplicatedLogEntry {
     /**
+     * Returns the payload/data to be replicated.
      *
-     * @return The payload/data to be replicated
+     * @return the payload/data
      */
     Payload getData();
 
     /**
+     * Returns the term of the entry.
      *
-     * @return The term of the entry
+     * @return the term
      */
     long getTerm();
 
     /**
+     * Returns the index of the entry.
      *
-     * @return The index of the entry
+     * @return the index
      */
     long getIndex();
 
     /**
+     * Returns the size of the entry in bytes. An approximate number may be good enough.
      *
-     * @return The size of the entry in bytes. An approximate number may be good enough.
+     * @return the size of the entry in bytes.
      */
     int size();
 }
index c81ecfb..4295633 100644 (file)
@@ -19,11 +19,7 @@ import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     private static final int DATA_SIZE_DIVIDER = 5;
 
-    private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
-        @Override
-        public void apply(final DeleteEntries notUsed) {
-        }
-    };
+    private final Procedure<DeleteEntries> deleteProcedure = NoopProcedure.instance();
 
     private final RaftActorContext context;
     private long dataSizeSinceLastSnapshot = 0L;
@@ -111,19 +107,16 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         // handler. This also holds for multiple persist calls in context
         // of a single command.
         context.getPersistenceProvider().persist(replicatedLogEntry,
-            new Procedure<ReplicatedLogEntry>() {
-                @Override
-                public void apply(final ReplicatedLogEntry param) throws Exception {
-                    context.getLogger().debug("{}: persist complete {}", context.getId(), param);
+            param -> {
+                context.getLogger().debug("{}: persist complete {}", context.getId(), param);
 
-                    int logEntrySize = param.size();
-                    dataSizeSinceLastSnapshot += logEntrySize;
+                int logEntrySize = param.size();
+                dataSizeSinceLastSnapshot += logEntrySize;
 
-                    if (callback != null) {
-                        callback.apply(param);
-                    }
+                if (callback != null) {
+                    callback.apply(param);
                 }
             }
         );
     }
-}
\ No newline at end of file
+}
index e5bf722..1ff69e7 100644 (file)
@@ -22,6 +22,13 @@ public final class ReplicatedLogImplEntry implements ReplicatedLogEntry, Seriali
     private final long term;
     private final Payload payload;
 
+    /**
+     * Constructs an instance.
+     *
+     * @param index the index
+     * @param term the term
+     * @param payload the payload
+     */
     public ReplicatedLogImplEntry(final long index, final long term, final Payload payload) {
         this.index = index;
         this.term = term;
index 2c20041..8655947 100644 (file)
@@ -47,7 +47,7 @@ public class ServerConfigurationPayload extends Payload implements PersistentPay
 
     @Override
     public int size() {
-        if(serializedSize < 0) {
+        if (serializedSize < 0) {
             try {
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 ObjectOutputStream out = new ObjectOutputStream(bos);
index fe4b79a..8a836bf 100644 (file)
@@ -11,6 +11,12 @@ import java.io.Serializable;
 import java.util.List;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 
+/**
+ * Represents a snapshot of the raft data.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
 public class Snapshot implements Serializable {
     private static final long serialVersionUID = -8298574936724056236L;
 
index 3fc43c7..5db4706 100644 (file)
@@ -19,13 +19,19 @@ import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
 
+/**
+ * Manages the capturing of snapshots for a RaftActor.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
 public class SnapshotManager implements SnapshotState {
 
     private final SnapshotState IDLE = new Idle();
     private final SnapshotState PERSISTING = new Persisting();
     private final SnapshotState CREATING = new Creating();
 
-    private final Logger LOG;
+    private final Logger log;
     private final RaftActorContext context;
     private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
             new LastAppliedTermInformationReader();
@@ -42,9 +48,15 @@ public class SnapshotManager implements SnapshotState {
     private ApplySnapshot applySnapshot;
     private Consumer<byte[]> applySnapshotProcedure;
 
+    /**
+     * Constructs an instance.
+     *
+     * @param context the RaftActorContext
+     * @param logger the Logger
+     */
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
-        this.LOG = logger;
+        this.log = logger;
     }
 
     public boolean isApplying() {
@@ -108,14 +120,22 @@ public class SnapshotManager implements SnapshotState {
         return captureSnapshot;
     }
 
-    private boolean hasFollowers(){
+    private boolean hasFollowers() {
         return context.hasFollowers();
     }
 
-    private String persistenceId(){
+    private String persistenceId() {
         return context.getId();
     }
 
+    /**
+     * Constructs a CaptureSnapshot instance.
+     *
+     * @param lastLogEntry the last log entry for the snapshot.
+     * @param replicatedToAllIndex the index of the last entry replicated to all followers.
+     * @param installSnapshotInitiated true if snapshot is initiated to install on a follower.
+     * @return a new CaptureSnapshot instance.
+     */
     public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
             boolean installSnapshotInitiated) {
         TermInformationReader lastAppliedTermInfoReader =
@@ -135,11 +155,11 @@ public class SnapshotManager implements SnapshotState {
 
         long lastLogEntryIndex = lastAppliedIndex;
         long lastLogEntryTerm = lastAppliedTerm;
-        if(lastLogEntry != null) {
+        if (lastLogEntry != null) {
             lastLogEntryIndex = lastLogEntry.getIndex();
             lastLogEntryTerm = lastLogEntry.getTerm();
         } else {
-            LOG.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+            log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
                     persistenceId(), lastAppliedIndex, lastAppliedTerm);
         }
 
@@ -156,54 +176,55 @@ public class SnapshotManager implements SnapshotState {
 
         @Override
         public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-            LOG.debug("capture should not be called in state {}", this);
+            log.debug("capture should not be called in state {}", this);
             return false;
         }
 
         @Override
-        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
-            LOG.debug("captureToInstall should not be called in state {}", this);
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
+                String targetFollower) {
+            log.debug("captureToInstall should not be called in state {}", this);
             return false;
         }
 
         @Override
         public void apply(ApplySnapshot snapshot) {
-            LOG.debug("apply should not be called in state {}", this);
+            log.debug("apply should not be called in state {}", this);
         }
 
         @Override
         public void persist(final byte[] snapshotBytes, final long totalMemory) {
-            LOG.debug("persist should not be called in state {}", this);
+            log.debug("persist should not be called in state {}", this);
         }
 
         @Override
         public void commit(final long sequenceNumber, long timeStamp) {
-            LOG.debug("commit should not be called in state {}", this);
+            log.debug("commit should not be called in state {}", this);
         }
 
         @Override
         public void rollback() {
-            LOG.debug("rollback should not be called in state {}", this);
+            log.debug("rollback should not be called in state {}", this);
         }
 
         @Override
         public long trimLog(final long desiredTrimIndex) {
-            LOG.debug("trimLog should not be called in state {}", this);
+            log.debug("trimLog should not be called in state {}", this);
             return -1;
         }
 
         protected long doTrimLog(final long desiredTrimIndex) {
             //  we would want to keep the lastApplied as its used while capturing snapshots
             long lastApplied = context.getLastApplied();
-            long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+            long tempMin = Math.min(desiredTrimIndex, lastApplied > -1 ? lastApplied - 1 : -1);
 
-            if(LOG.isTraceEnabled()) {
-                LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+            if (log.isTraceEnabled()) {
+                log.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
                         persistenceId(), desiredTrimIndex, lastApplied, tempMin);
             }
 
             if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
-                LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
+                log.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
                         context.getTermInformation().getCurrentTerm());
 
                 //use the term of the temp-min, since we check for isPresent, entry will not be null
@@ -214,7 +235,7 @@ public class SnapshotManager implements SnapshotState {
             }
 
             final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-            if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+            if (tempMin > currentBehavior.getReplicatedToAllIndex()) {
                 // It's possible a follower was lagging and an install snapshot advanced its match index past
                 // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
                 // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
@@ -236,16 +257,16 @@ public class SnapshotManager implements SnapshotState {
         private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
             captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
 
-            if(captureSnapshot.isInstallSnapshotInitiated()) {
-                LOG.info("{}: Initiating snapshot capture {} to install on {}",
+            if (captureSnapshot.isInstallSnapshotInitiated()) {
+                log.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
             } else {
-                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+                log.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
             }
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
-            LOG.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
+            log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
 
             SnapshotManager.this.currentState = CREATING;
 
@@ -253,7 +274,7 @@ public class SnapshotManager implements SnapshotState {
                 createSnapshotProcedure.run();
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
-                LOG.error("Error creating snapshot", e);
+                log.error("Error creating snapshot", e);
                 return false;
             }
 
@@ -266,19 +287,20 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
+                String targetFollower) {
             return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
         }
 
         @Override
-        public void apply(ApplySnapshot applySnapshot) {
-            SnapshotManager.this.applySnapshot = applySnapshot;
+        public void apply(ApplySnapshot toApply) {
+            SnapshotManager.this.applySnapshot = toApply;
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
-            LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
+            log.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
 
-            context.getPersistenceProvider().saveSnapshot(applySnapshot.getSnapshot());
+            context.getPersistenceProvider().saveSnapshot(toApply.getSnapshot());
 
             SnapshotManager.this.currentState = PERSISTING;
         }
@@ -310,10 +332,9 @@ public class SnapshotManager implements SnapshotState {
 
             context.getPersistenceProvider().saveSnapshot(snapshot);
 
-            LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
+            log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
 
-            long dataThreshold = totalMemory *
-                    context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+            long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
             boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold;
 
             boolean logSizeExceededSnapshotBatchCount =
@@ -321,15 +342,16 @@ public class SnapshotManager implements SnapshotState {
 
             final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
             if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
-                if(LOG.isDebugEnabled()) {
-                    if(dataSizeThresholdExceeded) {
-                        LOG.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}",
+                if (log.isDebugEnabled()) {
+                    if (dataSizeThresholdExceeded) {
+                        log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}",
                                 context.getId(), context.getReplicatedLog().dataSize(), dataThreshold,
                                 captureSnapshot.getLastAppliedIndex());
                     } else {
-                        LOG.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}",
+                        log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}",
                                 context.getId(), context.getReplicatedLog().size(),
-                                context.getConfigParams().getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex());
+                                context.getConfigParams().getSnapshotBatchCount(),
+                                captureSnapshot.getLastAppliedIndex());
                     }
                 }
 
@@ -342,11 +364,11 @@ public class SnapshotManager implements SnapshotState {
 
                 // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
                 // install snapshot to a follower.
-                if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+                if (captureSnapshot.getReplicatedToAllIndex() >= 0) {
                     currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
                 }
 
-            } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+            } else if (captureSnapshot.getReplicatedToAllIndex() != -1) {
                 // clear the log based on replicatedToAllIndex
                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
                         captureSnapshot.getReplicatedToAllTerm());
@@ -361,8 +383,8 @@ public class SnapshotManager implements SnapshotState {
                         context.getReplicatedLog().getSnapshotTerm());
             }
 
-            LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
-                    "and term: {}", context.getId(), context.getReplicatedLog().getSnapshotIndex(),
+            log.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} and term: {}",
+                    context.getId(), context.getReplicatedLog().getSnapshotIndex(),
                     context.getReplicatedLog().getSnapshotTerm());
 
             if (context.getId().equals(currentBehavior.getLeaderId())
@@ -386,9 +408,9 @@ public class SnapshotManager implements SnapshotState {
 
         @Override
         public void commit(final long sequenceNumber, long timeStamp) {
-            LOG.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
+            log.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
 
-            if(applySnapshot != null) {
+            if (applySnapshot != null) {
                 try {
                     Snapshot snapshot = applySnapshot.getSnapshot();
 
@@ -398,17 +420,17 @@ public class SnapshotManager implements SnapshotState {
                     context.setCommitIndex(snapshot.getLastAppliedIndex());
                     context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
-                    if(snapshot.getServerConfiguration() != null) {
+                    if (snapshot.getServerConfiguration() != null) {
                         context.updatePeerIds(snapshot.getServerConfiguration());
                     }
 
-                    if(snapshot.getState().length > 0 ) {
+                    if (snapshot.getState().length > 0 ) {
                         applySnapshotProcedure.accept(snapshot.getState());
                     }
 
                     applySnapshot.getCallback().onSuccess();
                 } catch (Exception e) {
-                    LOG.error("{}: Error applying snapshot", context.getId(), e);
+                    log.error("{}: Error applying snapshot", context.getId(), e);
                 }
             } else {
                 context.getReplicatedLog().snapshotCommit();
@@ -425,11 +447,11 @@ public class SnapshotManager implements SnapshotState {
         @Override
         public void rollback() {
             // Nothing to rollback if we're applying a snapshot from the leader.
-            if(applySnapshot == null) {
+            if (applySnapshot == null) {
                 context.getReplicatedLog().snapshotRollback();
 
-                LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
-                        "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+                log.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle."
+                        "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
                         context.getReplicatedLog().size());
@@ -457,20 +479,21 @@ public class SnapshotManager implements SnapshotState {
 
     private interface TermInformationReader {
         long getIndex();
+
         long getTerm();
     }
 
-    static class LastAppliedTermInformationReader implements TermInformationReader{
+    static class LastAppliedTermInformationReader implements TermInformationReader {
         private long index;
         private long term;
 
-        public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
-                                         ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
+        LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex, ReplicatedLogEntry lastLogEntry,
+                boolean hasFollowers) {
             ReplicatedLogEntry entry = log.get(originalIndex);
             this.index = -1L;
             this.term = -1L;
             if (!hasFollowers) {
-                if(lastLogEntry != null) {
+                if (lastLogEntry != null) {
                     // since we have persisted the last-log-entry to persistent journal before the capture,
                     // we would want to snapshot from this entry.
                     index = lastLogEntry.getIndex();
@@ -479,7 +502,7 @@ public class SnapshotManager implements SnapshotState {
             } else if (entry != null) {
                 index = entry.getIndex();
                 term = entry.getTerm();
-            } else if(log.getSnapshotIndex() > -1){
+            } else if (log.getSnapshotIndex() > -1) {
                 index = log.getSnapshotIndex();
                 term = log.getSnapshotTerm();
             }
@@ -487,21 +510,21 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long getIndex(){
+        public long getIndex() {
             return this.index;
         }
 
         @Override
-        public long getTerm(){
+        public long getTerm() {
             return this.term;
         }
     }
 
-    private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
+    private static class ReplicatedToAllTermInformationReader implements TermInformationReader {
         private long index;
         private long term;
 
-        ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
+        ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex) {
             ReplicatedLogEntry entry = log.get(originalIndex);
             this.index = -1L;
             this.term = -1L;
@@ -515,12 +538,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long getIndex(){
+        public long getIndex() {
             return this.index;
         }
 
         @Override
-        public long getTerm(){
+        public long getTerm() {
             return this.term;
         }
     }
index a6bcddf..639fdee 100644 (file)
@@ -10,6 +10,12 @@ package org.opendaylight.controller.cluster.raft;
 
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 
+/**
+ * Interface for a snapshot phase state.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
 public interface SnapshotState {
     /**
      * @return true when a snapshot is being captured
index 933b134..c58d6f9 100644 (file)
@@ -14,6 +14,7 @@ import scala.concurrent.duration.FiniteDuration;
 /**
  * An abstract class that implements a Runnable operation with a timer such that if the run method isn't
  * invoked within a timeout period, the operation is cancelled via {@link #doCancel}.
+ *
  * <p>
  * <b>Note:</b> this class is not thread safe and is intended for use only within the context of the same
  * actor that's passed on construction. The run method must be called on this actor's thread dispatcher as it
@@ -28,17 +29,13 @@ abstract class TimedRunnable implements Runnable {
     TimedRunnable(FiniteDuration timeout, RaftActor actor) {
         Preconditions.checkNotNull(timeout);
         Preconditions.checkNotNull(actor);
-        cancelTimer = actor.getContext().system().scheduler().scheduleOnce(timeout, actor.self(), new Runnable() {
-            @Override
-            public void run() {
-                cancel();
-            }
-        }, actor.getContext().system().dispatcher(), actor.self());
+        cancelTimer = actor.getContext().system().scheduler().scheduleOnce(timeout, actor.self(),
+                (Runnable) () -> cancel(), actor.getContext().system().dispatcher(), actor.self());
     }
 
     @Override
     public void run() {
-        if(canRun) {
+        if (canRun) {
             canRun = false;
             cancelTimer.cancel();
             doRun();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.