Fix warnings and clean up javadocs in sal-akka-raft 11/46511/3
authorTom Pantelis <tpanteli@brocade.com>
Tue, 4 Oct 2016 19:38:58 +0000 (15:38 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 7 Oct 2016 16:09:48 +0000 (16:09 +0000)
Fixed a lot of checkstyle warnings and cleaned up javadocs for the remaining
classes sal-akka-raft. Most of the warnings/changes were for:
 - white space before if/for
 - white space before beginning brace
 - line too long
 - period after first sentence in javadoc
 - missing <p/> in javadoc

Change-Id: I99a3cd08af10d46acecd0e22f04d54b95e2287d9
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
32 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendHeartBeat.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerRemoved.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTerm.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java

index d4498221638a147dbcd8edf3d17ea46bb895e43b..9fb5554abcdf886bcf86567c3954ee31dfa57667 100644 (file)
@@ -13,7 +13,7 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 
 /**
- * Internal message, issued by follower to its actor
+ * Internal message, issued by follower to its actor.
  */
 public class ApplySnapshot {
     private static final Callback NOOP_CALLBACK = new Callback() {
index 7c182f04e433d63c074598e5344e740f1de31d90..14bd3a0af4c7cbc42304605429057188dce11f18 100644 (file)
@@ -38,7 +38,8 @@ public class CaptureSnapshot {
         this.installSnapshotInitiated = installSnapshotInitiated;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.replicatedToAllTerm = replicatedToAllTerm;
-        this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.<ReplicatedLogEntry>emptyList();
+        this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries :
+            Collections.<ReplicatedLogEntry>emptyList();
     }
 
     public long getLastAppliedIndex() {
@@ -80,7 +81,8 @@ public class CaptureSnapshot {
                 .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
                 .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
                 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
-                .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]");
+                .append(replicatedToAllTerm).append(", unAppliedEntries size=")
+                .append(unAppliedEntries.size()).append("]");
         return builder.toString();
     }
 
index d2100cb950654785e748fa324431651ff44b46a0..00ef63cf36fb76803c868a569f32e53057ae9d33 100644 (file)
@@ -13,8 +13,7 @@ import javax.annotation.Nonnull;
 
 /**
  * The FollowerInitialSyncUpStatus is sent by a Follower to inform any RaftActor subclass whether the Follower
- * is at least at the same commitIndex as the Leader was when it sent the follower the very first heartbeat.
- *
+ * is at least at the same commitIndex as the Leader was when it sent the follower the very first heart beat.
  * This status can be used to determine if a Follower has caught up with the current Leader in an upgrade scenario
  * for example.
  */
index 5e1f20b97d47fd27f917a69ec55416b4f326cdd4..ba1c157d37241f2274c9c7a9ed307f281a282a82 100644 (file)
@@ -11,10 +11,7 @@ package org.opendaylight.controller.cluster.raft.base.messages;
 import java.io.Serializable;
 
 /**
- * This messages is sent to the Leader to prompt it to send a heartbeat
- * to it's followers.
- *
- * Typically the Leader to itself on a schedule
+ * This messages is sent via a schedule to the Leader to prompt it to send a heart beat to its followers.
  */
 public final class SendHeartBeat implements Serializable {
     private static final long serialVersionUID = 1L;
index 94e31846eebf1d947d483585ce19ec2b3a3f5ae7..de33b8c95b1ad0679d6a2af3c37e880dca099efa 100644 (file)
@@ -13,8 +13,8 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 
 /**
- * Internal message sent from the SnapshotManager to its associated leader. The leader is expected to apply the
- * {@link Snapshot} to its state.
+ * Internal message sent from the SnapshotManager to its associated leader when a snapshot capture is complete to
+ * prompt the leader to install the snapshot on its followers as needed.
  */
 public final class SendInstallSnapshot {
     private final Snapshot snapshot;
@@ -23,7 +23,8 @@ public final class SendInstallSnapshot {
         this.snapshot = Preconditions.checkNotNull(snapshot);
     }
 
-    @Nonnull public Snapshot getSnapshot() {
+    @Nonnull
+    public Snapshot getSnapshot() {
         return snapshot;
     }
 }
index 9596bb388189e6996a56328bed777b6fc8c51389..162878f3fd4166027e629b563806480b4b4273c0 100644 (file)
@@ -29,10 +29,6 @@ public class SwitchBehavior {
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("SwitchBehavior{");
-        sb.append("newState=").append(newState);
-        sb.append(", newTerm=").append(newTerm);
-        sb.append('}');
-        return sb.toString();
+        return "SwitchBehavior [newState=" + newState + ", newTerm=" + newTerm + "]";
     }
 }
index 479ca5ae7b3c40454daac580f9ebc561a11ea3d9..fab1714989a99407f3d51452d33ef313ac2c14e8 100644 (file)
@@ -60,15 +60,14 @@ import scala.concurrent.duration.FiniteDuration;
  * respond after entry applied to state machine (§5.3)
  * <li> If last log index â‰¥ nextIndex for a follower: send
  * AppendEntries RPC with log entries starting at nextIndex
- * <ul>
  * <li> If successful: update nextIndex and matchIndex for
  * follower (§5.3)
  * <li> If AppendEntries fails because of log inconsistency:
  * decrement nextIndex and retry (§5.3)
- * </ul>
  * <li> If there exists an N such that N > commitIndex, a majority
  * of matchIndex[i] â‰¥ N, and log[N].term == currentTerm:
  * set commitIndex = N (§5.3, Â§5.4).
+ * </ul>
  */
 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
@@ -76,7 +75,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     /**
      * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
      * expect the entries to be modified in sequence, hence we open-code the lookup.
-     *
      * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
      *       but we already expect those to be far from frequent.
      */
@@ -90,18 +88,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             @Nullable AbstractLeader initializeFromLeader) {
         super(context, state);
 
-        if(initializeFromLeader != null) {
+        if (initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
             snapshot = initializeFromLeader.snapshot;
             trackers.addAll(initializeFromLeader.trackers);
         } else {
-            for(PeerInfo peerInfo: context.getPeers()) {
+            for (PeerInfo peerInfo: context.getPeers()) {
                 FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
                 followerToLog.put(peerInfo.getId(), followerLogInformation);
             }
         }
 
-        LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
+        log.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
 
         updateMinReplicaCount();
 
@@ -133,7 +131,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 context.getPeerInfo(followerId), -1, context);
         followerToLog.put(followerId, followerLogInformation);
 
-        if(heartbeatSchedule == null) {
+        if (heartbeatSchedule == null) {
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
         }
     }
@@ -144,8 +142,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     public void updateMinReplicaCount() {
         int numVoting = 0;
-        for(PeerInfo peer: context.getPeers()) {
-            if(peer.isVoting()) {
+        for (PeerInfo peer: context.getPeers()) {
+            if (peer.isVoting()) {
                 numVoting++;
             }
         }
@@ -153,7 +151,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         minReplicationCount = getMajorityVoteCount(numVoting);
     }
 
-    protected int getMinIsolatedLeaderPeerCount(){
+    protected int getMinIsolatedLeaderPeerCount() {
       //the isolated Leader peer count will be 1 less than the majority vote count.
         //this is because the vote count has the self vote counted in it
         //for e.g
@@ -161,12 +159,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
         //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
 
-        return minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
+        return minReplicationCount > 0 ? minReplicationCount - 1 : 0;
     }
 
     @VisibleForTesting
     void setSnapshot(@Nullable Snapshot snapshot) {
-        if(snapshot != null) {
+        if (snapshot != null) {
             this.snapshot = Optional.of(new SnapshotHolder(snapshot));
         } else {
             this.snapshot = Optional.absent();
@@ -182,30 +180,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
-        LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+        log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
 
         return this;
     }
 
     @Override
     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
-        }
+        log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
 
         // Update the FollowerLogInformation
         String followerId = appendEntriesReply.getFollowerId();
         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
 
-        if(followerLogInformation == null){
-            LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
+        if (followerLogInformation == null) {
+            log.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
             return this;
         }
 
-        if(followerLogInformation.timeSinceLastActivity() >
-                context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
-            LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " +
-                            "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
+        if (followerLogInformation.timeSinceLastActivity()
+                context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
+            log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
+                    + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
                     logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
                     context.getLastApplied(), context.getCommitIndex());
         }
@@ -217,7 +213,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
         long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
         boolean updated = false;
-        if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
+        if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
             // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
             // in raft as a node cannot become leader if it's log is behind another's. However, the
             // non-voting semantics deviate a bit from raft. Only voting members participate in
@@ -231,9 +227,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // snapshot. It's also possible that the follower's last log index is behind the leader's.
             // However in this case the log terms won't match and the logs will conflict - this is handled
             // elsewhere.
-            LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot",
-                    logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
-                    context.getReplicatedLog().lastIndex());
+            log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
+                    + "forcing install snaphot", logName(), followerLogInformation.getId(),
+                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
 
             followerLogInformation.setMatchIndex(-1);
             followerLogInformation.setNextIndex(-1);
@@ -242,8 +238,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             updated = true;
         } else if (appendEntriesReply.isSuccess()) {
-            if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 &&
-                    followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
+            if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0
+                    && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
                 // The follower's last entry is present in the leader's journal but the terms don't match so the
                 // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means
                 // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot
@@ -255,17 +251,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInformation.setNextIndex(followerLastLogIndex - 1);
                 updated = true;
 
-                LOG.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " +
-                          "leader's {} - set the follower's next index to {}",
-                        logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
+                log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+                        + "leader's {} - set the follower's next index to {}", logName(),
+                        followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
                         followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
             } else {
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
             }
         } else {
-            LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+            log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
 
-            if(appendEntriesReply.isForceInstallSnapshot()) {
+            if (appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
                 // in common with this Leader and so would require a snapshot to be installed
                 followerLogInformation.setMatchIndex(-1);
@@ -273,8 +269,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Force initiate a snapshot capture
                 initiateCaptureSnapshot(followerId);
-            } else if(followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 &&
-                    followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
+            } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0
+                    && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
                 // The follower's log is empty or the last entry is present in the leader's journal
                 // and the terms match so the follower is just behind the leader's journal from
                 // the last snapshot, if any. We'll catch up the follower quickly by starting at the
@@ -288,7 +284,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInformation.decrNextIndex();
                 updated = true;
 
-                LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
+                log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
                         logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
                         followerLogInformation.getNextIndex());
             }
@@ -298,34 +294,36 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         // If there exists an N such that N > commitIndex, a majority
         // of matchIndex[i] â‰¥ N, and log[N].term == currentTerm:
         // set commitIndex = N (§5.3, Â§5.4).
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
+        if (log.isTraceEnabled()) {
+            log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
                     logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
         }
 
-        for (long N = context.getCommitIndex() + 1; ; N++) {
+        for (long index = context.getCommitIndex() + 1; ; index++) {
             int replicatedCount = 1;
 
-            LOG.trace("{}: checking Nth index {}", logName(), N);
+            log.trace("{}: checking Nth index {}", logName(), index);
             for (FollowerLogInformation info : followerToLog.values()) {
                 final PeerInfo peerInfo = context.getPeerInfo(info.getId());
-                if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) {
+                if (info.getMatchIndex() >= index && peerInfo != null && peerInfo.isVoting()) {
                     replicatedCount++;
-                } else if(LOG.isTraceEnabled()) {
-                    LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+                } else if (log.isTraceEnabled()) {
+                    log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
                             info.getMatchIndex(), peerInfo);
                 }
             }
 
-            if(LOG.isTraceEnabled()) {
-                LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount);
+            if (log.isTraceEnabled()) {
+                log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount,
+                        minReplicationCount);
             }
 
             if (replicatedCount >= minReplicationCount) {
-                ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
+                ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
                 if (replicatedLogEntry == null) {
-                    LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
-                            logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size());
+                    log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+                            logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+                            context.getReplicatedLog().size());
                     break;
                 }
 
@@ -335,23 +333,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // reach consensus, as per Â§5.4.1: "once an entry from the current term is committed by
                 // counting replicas, then all prior entries are committed indirectly".
                 if (replicatedLogEntry.getTerm() == currentTerm()) {
-                    LOG.trace("{}: Setting commit index to {}", logName(), N);
-                    context.setCommitIndex(N);
+                    log.trace("{}: Setting commit index to {}", logName(), index);
+                    context.setCommitIndex(index);
                 } else {
-                    LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}",
-                            logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
+                    log.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, "
+                            + "term {} does not match the current term {}", logName(), index,
+                            replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
                 }
             } else {
-                LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
+                log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
                 break;
             }
         }
 
         // Apply the change to the state machine
         if (context.getCommitIndex() > context.getLastApplied()) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
-                        logName(), followerId, context.getCommitIndex(), context.getLastApplied());
+            if (log.isDebugEnabled()) {
+                log.debug(
+                    "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
+                    logName(), followerId, context.getCommitIndex(), context.getLastApplied());
             }
 
             applyLogToStateMachine(context.getCommitIndex());
@@ -372,10 +372,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
         updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
 
-        if(updated && LOG.isDebugEnabled()) {
-            LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
-                    logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
-                    followerLogInformation.getNextIndex());
+        if (updated && log.isDebugEnabled()) {
+            log.debug(
+                "{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
+                logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
+                followerLogInformation.getNextIndex());
         }
         return updated;
     }
@@ -425,7 +426,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+                log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
                         logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
@@ -438,7 +439,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             beforeSendHeartbeat();
             sendHeartBeat();
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
-        } else if(message instanceof SendInstallSnapshot) {
+        } else if (message instanceof SendInstallSnapshot) {
             // received from RaftActor
             setSnapshot(((SendInstallSnapshot) message).getSnapshot());
             sendInstallSnapshot();
@@ -454,20 +455,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
-        LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
+        log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
 
         String followerId = reply.getFollowerId();
         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-        if(followerLogInformation == null) {
+        if (followerLogInformation == null) {
             // This can happen during AddServer if it times out.
-            LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
+            log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
                     logName(), followerId);
             return;
         }
 
         LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
         if (installSnapshotState == null) {
-            LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
+            log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
                     logName(), followerId);
             return;
         }
@@ -477,22 +478,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
             boolean wasLastChunk = false;
             if (reply.isSuccess()) {
-                if(installSnapshotState.isLastChunk(reply.getChunkIndex())) {
+                if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: InstallSnapshotReply received, " +
-                                "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}",
-                                logName(), reply.getChunkIndex(), followerId,
-                            context.getReplicatedLog().getSnapshotIndex() + 1
-                        );
-                    }
+                    log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
+                            + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
+                            context.getReplicatedLog().getSnapshotIndex() + 1);
 
                     long followerMatchIndex = snapshot.get().getLastIncludedIndex();
                     followerLogInformation.setMatchIndex(followerMatchIndex);
                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
                     followerLogInformation.clearLeaderInstallSnapshotState();
 
-                    LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
+                    log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
                         logName(), followerId, followerLogInformation.getMatchIndex(),
                         followerLogInformation.getNextIndex());
 
@@ -503,17 +500,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     }
 
                     wasLastChunk = true;
-                    if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){
+                    if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
                         UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
                                              new UnInitializedFollowerSnapshotReply(followerId);
                         context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
-                        LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self");
+                        log.debug("Sent message UnInitializedFollowerSnapshotReply to self");
                     }
                 } else {
                     installSnapshotState.markSendStatus(true);
                 }
             } else {
-                LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+                log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
                         logName(), reply.getChunkIndex());
 
                 installSnapshotState.markSendStatus(false);
@@ -524,17 +521,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 purgeInMemoryLog();
             } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
                 ActorSelection followerActor = context.getPeerActorSelection(followerId);
-                if(followerActor != null) {
+                if (followerActor != null) {
                     sendSnapshotChunk(followerActor, followerLogInformation);
                 }
             }
 
         } else {
-            LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+            log.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
                     logName(), reply.getChunkIndex(), followerId,
                     installSnapshotState.getChunkIndex());
 
-            if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){
+            if (reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX) {
                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
                 // so that Installing the snapshot can resume from the beginning
                 installSnapshotState.reset();
@@ -543,8 +540,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private boolean anyFollowersInstallingSnapshot() {
-        for(FollowerLogInformation info: followerToLog.values()) {
-            if(info.getInstallSnapshotState() != null) {
+        for (FollowerLogInformation info: followerToLog.values()) {
+            if (info.getInstallSnapshotState() != null) {
                 return true;
             }
 
@@ -556,12 +553,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
-        LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
+        log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
                 replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
 
         // Create a tracker entry we will use this later to notify the
         // client actor
-        if(replicate.getClientActor() != null) {
+        if (replicate.getClientActor() != null) {
             trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
                     logIndex));
         }
@@ -569,7 +566,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         boolean applyModificationToState = !context.anyVotingPeers()
                 || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
 
-        if(applyModificationToState){
+        if (applyModificationToState) {
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
         }
@@ -585,20 +582,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             final String followerId = e.getKey();
             final FollowerLogInformation followerLogInformation = e.getValue();
             // This checks helps not to send a repeat message to the follower
-            if(!followerLogInformation.isFollowerActive() ||
-                    followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
+            if (!followerLogInformation.isFollowerActive()
+                    || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
                 sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
             }
         }
     }
 
     /**
-     *
      * This method checks if any update needs to be sent to the given follower. This includes append log entries,
      * sending next snapshot chunk, and initiating a snapshot.
+     *
      * @return true if any update is sent, false otherwise
      */
-
     private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
                                        boolean sendHeartbeat, boolean isHeartbeat) {
 
@@ -614,7 +610,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // if install snapshot is in process , then sent next chunk if possible
                 if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
                     sendSnapshotChunk(followerActor, followerLogInformation);
-                } else if(sendHeartbeat) {
+                } else if (sendHeartbeat) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
                     sendAppendEntries = true;
                 }
@@ -622,17 +618,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
                 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
 
-                if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
-                    LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
-                            logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
+                if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) {
+                    log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, "
+                            + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive,
+                            followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
                 }
 
                 if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
 
-                    LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
+                    log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
                             followerNextIndex, followerId);
 
-                    if(followerLogInformation.okToReplicate()) {
+                    if (followerLogInformation.okToReplicate()) {
                         // Try to send all the entries in the journal but not exceeding the max data size
                         // for a single AppendEntries message.
                         int maxEntries = (int) context.getReplicatedLog().size();
@@ -640,16 +637,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                                 context.getConfigParams().getSnapshotChunkSize());
                         sendAppendEntries = true;
                     }
-                } else if (isFollowerActive && followerNextIndex >= 0 &&
-                    leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
+                } else if (isFollowerActive && followerNextIndex >= 0
+                        && leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
                     // if the followers next index is not present in the leaders log, and
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
 
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " +
-                                    "follower-nextIndex: %d, leader-snapshot-index: %d,  " +
-                                    "leader-last-index: %d", logName(), followerId,
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
+                                    + "follower-nextIndex: %d, leader-snapshot-index: %d,  "
+                                    "leader-last-index: %d", logName(), followerId,
                                     followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
                     }
 
@@ -659,7 +656,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         initiateCaptureSnapshot(followerId);
                     }
 
-                } else if(sendHeartbeat) {
+                } else if (sendHeartbeat) {
                     // we send an AppendEntries, even if the follower is inactive
                     // in-order to update the followers timestamp, in case it becomes active again
                     sendAppendEntries = true;
@@ -667,7 +664,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             }
 
-            if(sendAppendEntries) {
+            if (sendAppendEntries) {
                 sendAppendEntriesToFollower(followerActor, entries, followerLogInformation);
             }
         }
@@ -692,8 +689,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             getLogEntryTerm(followerNextIndex - 1), entries,
             leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
 
-        if(!entries.isEmpty() || LOG.isTraceEnabled()) {
-            LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
+        if (!entries.isEmpty() || log.isTraceEnabled()) {
+            log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
                     appendEntries);
         }
 
@@ -702,7 +699,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     /**
      * Initiates a snapshot capture to install on a follower.
-     *
+     * <p/>
      * Install Snapshot works as follows
      *   1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
      *   2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
@@ -731,7 +728,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         } else {
             boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
                     this.getReplicatedToAllIndex(), followerId);
-            if(captureInitiated) {
+            if (captureInitiated) {
                 followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
                         context.getConfigParams().getSnapshotChunkSize(), logName()));
             }
@@ -740,19 +737,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
-    private boolean canInstallSnapshot(long nextIndex){
+    private boolean canInstallSnapshot(long nextIndex) {
         // If the follower's nextIndex is -1 then we might as well send it a snapshot
         // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
         // in the snapshot
-        return nextIndex == -1 ||
-                (!context.getReplicatedLog().isPresent(nextIndex)
-                        && context.getReplicatedLog().isInSnapshot(nextIndex));
+        return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex)
+                && context.getReplicatedLog().isInSnapshot(nextIndex);
 
     }
 
 
     private void sendInstallSnapshot() {
-        LOG.debug("{}: sendInstallSnapshot", logName());
+        log.debug("{}: sendInstallSnapshot", logName());
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             String followerId = e.getKey();
             ActorSelection followerActor = context.getPeerActorSelection(followerId);
@@ -760,9 +756,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             if (followerActor != null) {
                 long nextIndex = followerLogInfo.getNextIndex();
-                if (followerLogInfo.getInstallSnapshotState() != null ||
-                        context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
-                        canInstallSnapshot(nextIndex)) {
+                if (followerLogInfo.getInstallSnapshotState() != null
+                        || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED
+                        || canInstallSnapshot(nextIndex)) {
                     sendSnapshotChunk(followerActor, followerLogInfo);
                 }
             }
@@ -787,12 +783,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
-            LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+            log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
                     nextSnapshotChunk.length);
 
             int nextChunkIndex = installSnapshotState.incrementChunkIndex();
             Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
-            if(installSnapshotState.isLastChunk(nextChunkIndex)) {
+            if (installSnapshotState.isLastChunk(nextChunkIndex)) {
                 serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
             }
 
@@ -809,17 +805,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 actor()
             );
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                        logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
-                        installSnapshotState.getTotalChunks());
-            }
+            log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+                    installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
         }
     }
 
     private void sendHeartBeat() {
         if (!followerToLog.isEmpty()) {
-            LOG.trace("{}: Sending heartbeat", logName());
+            log.trace("{}: Sending heartbeat", logName());
             sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
         }
     }
@@ -868,7 +861,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         int minPresent = getMinIsolatedLeaderPeerCount();
         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
             final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
-            if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
+            if (peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
                 --minPresent;
                 if (minPresent == 0) {
                     return false;
index 375b3779b891dd94e1a3b702f9396853478de265..5c5c520761ecc4fba107cf5274a95292dc207273 100644 (file)
@@ -28,41 +28,43 @@ import org.slf4j.Logger;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
- * Abstract class that represents the behavior of a RaftActor
- * <p/>
- * All Servers:
- * <ul>
- * <li> If commitIndex > lastApplied: increment lastApplied, apply
- * log[lastApplied] to state machine (§5.3)
- * <li> If RPC request or response contains term T > currentTerm:
- * set currentTerm = T, convert to follower (§5.1)
+ * Abstract class that provides common code for a RaftActor behavior.
  */
 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     /**
-     * Information about the RaftActor whose behavior this class represents
+     * Information about the RaftActor whose behavior this class represents.
      */
     protected final RaftActorContext context;
 
     /**
-     *
+     * Used for message logging.
      */
-    protected final Logger LOG;
+    protected final Logger log;
 
     /**
-     *
+     * Prepended to log messages to provide appropriate context.
      */
-    private Cancellable electionCancel = null;
-
-    private long replicatedToAllIndex = -1;
-
     private final String logName;
 
+    /**
+     * The RaftState corresponding to his behavior.
+     */
     private final RaftState state;
 
+    /**
+     * Used to cancel a scheduled election.
+     */
+    private Cancellable electionCancel = null;
+
+    /**
+     * The index of the last log entry that has been replicated to all raft peers.
+     */
+    private long replicatedToAllIndex = -1;
+
     AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) {
         this.context = Preconditions.checkNotNull(context);
         this.state = Preconditions.checkNotNull(state);
-        this.LOG = context.getLogger();
+        this.log = context.getLogger();
 
         logName = String.format("%s (%s)", context.getId(), state);
     }
@@ -118,29 +120,22 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries);
 
-
     /**
-     * appendEntries first processes the AppendEntries message and then
-     * delegates handling to a specific behavior
+     * Handles the common logic for the AppendEntries message and delegates handling to the derived class.
      *
-     * @param sender
-     * @param appendEntries
+     * @param sender the ActorRef that sent the message
+     * @param appendEntries the message
      * @return a new behavior if it was changed or the current behavior
      */
-    protected RaftActorBehavior appendEntries(ActorRef sender,
-        AppendEntries appendEntries) {
+    protected RaftActorBehavior appendEntries(ActorRef sender, AppendEntries appendEntries) {
 
         // 1. Reply false if term < currentTerm (§5.1)
         if (appendEntries.getTerm() < currentTerm()) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
-                        logName(), appendEntries.getTerm(), currentTerm());
-            }
+            log.debug("{}: Cannot append entries because sender term {} is less than {}", logName(),
+                    appendEntries.getTerm(), currentTerm());
 
-            sender.tell(
-                new AppendEntriesReply(context.getId(), currentTerm(), false,
-                    lastIndex(), lastTerm(), context.getPayloadVersion()), actor()
-            );
+            sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
+                    context.getPayloadVersion()), actor());
             return this;
         }
 
@@ -164,33 +159,32 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         AppendEntriesReply appendEntriesReply);
 
     /**
-     * requestVote handles the RequestVote message. This logic is common
-     * for all behaviors
+     * Handles the logic for the RequestVote message that is common for all behaviors.
      *
-     * @param sender
-     * @param requestVote
+     * @param sender the ActorRef that sent the message
+     * @param requestVote the message
      * @return a new behavior if it was changed or the current behavior
      */
     protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) {
 
-        LOG.debug("{}: In requestVote:  {}", logName(), requestVote);
+        log.debug("{}: In requestVote:  {}", logName(), requestVote);
 
         boolean grantVote = canGrantVote(requestVote);
 
-        if(grantVote) {
+        if (grantVote) {
             context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId());
         }
 
         RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote);
 
-        LOG.debug("{}: requestVote returning: {}", logName(), reply);
+        log.debug("{}: requestVote returning: {}", logName(), reply);
 
         sender.tell(reply, actor());
 
         return this;
     }
 
-    protected boolean canGrantVote(RequestVote requestVote){
+    protected boolean canGrantVote(RequestVote requestVote) {
         boolean grantVote = false;
 
         //  Reply false if term < currentTerm (§5.1)
@@ -213,7 +207,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             // more up-to-date.
             if (requestVote.getLastLogTerm() > lastTerm()) {
                 candidateLatest = true;
-            } else if ((requestVote.getLastLogTerm() == lastTerm())
+            } else if (requestVote.getLastLogTerm() == lastTerm()
                     && requestVote.getLastLogIndex() >= lastIndex()) {
                 candidateLatest = true;
             }
@@ -241,6 +235,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         RequestVoteReply requestVoteReply);
 
     /**
+     * Returns a duration for election with an additional variance for randomness.
      *
      * @return a random election duration
      */
@@ -251,7 +246,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     /**
-     * stop the scheduled election
+     * Stops the currently scheduled election.
      */
     protected void stopElection() {
         if (electionCancel != null && !electionCancel.isCancelled()) {
@@ -264,7 +259,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     /**
-     * schedule a new election
+     * Schedule a new election.
      *
      * @param interval the duration after which we should trigger a new election
      */
@@ -277,6 +272,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     /**
+     * Returns the current election term.
+     *
      * @return the current term
      */
     protected long currentTerm() {
@@ -284,6 +281,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     /**
+     * Returns the id of the candidate that this server voted for in current term.
+     *
      * @return the candidate for whom we voted in the current term
      */
     protected String votedFor() {
@@ -291,46 +290,53 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     /**
-     * @return the actor associated with this behavior
+     * Returns the actor associated with this behavior.
+     *
+     * @return the actor
      */
     protected ActorRef actor() {
         return context.getActor();
     }
 
     /**
+     * Returns the term of the last entry in the log.
      *
-     * @return the term from the last entry in the log
+     * @return the term
      */
     protected long lastTerm() {
         return context.getReplicatedLog().lastTerm();
     }
 
     /**
-     * @return the index from the last entry in the log
+     * Returns the index of the last entry in the log.
+     *
+     * @return the index
      */
     protected long lastIndex() {
         return context.getReplicatedLog().lastIndex();
     }
 
     /**
-     * @param logIndex
-     * @return the client request tracker for the specified logIndex
+     * Removes and returns the ClientRequestTracker for the specified log index.
+     * @param logIndex the log index
+     * @return the ClientRequestTracker or null if none available
      */
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
         return null;
     }
 
     /**
+     * Returns the actual index of the entry in replicated log for the given index or -1 if not found.
      *
-     * @return the log entry index for the given index or -1 if not found
+     * @return the log entry index or -1 if not found
      */
-    protected long getLogEntryIndex(long index){
-        if(index == context.getReplicatedLog().getSnapshotIndex()){
+    protected long getLogEntryIndex(long index) {
+        if (index == context.getReplicatedLog().getSnapshotIndex()) {
             return context.getReplicatedLog().getSnapshotIndex();
         }
 
         ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
-        if(entry != null){
+        if (entry != null) {
             return entry.getIndex();
         }
 
@@ -338,15 +344,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     /**
-     * @return the log entry term for the given index or -1 if not found
+     * Returns the actual term of the entry in replicated log for the given index or -1 if not found.
+     *
+     * @return the log entry term or -1 if not found
      */
-    protected long getLogEntryTerm(long index){
-        if(index == context.getReplicatedLog().getSnapshotIndex()){
+    protected long getLogEntryTerm(long index) {
+        if (index == context.getReplicatedLog().getSnapshotIndex()) {
             return context.getReplicatedLog().getSnapshotTerm();
         }
 
         ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
-        if(entry != null){
+        if (entry != null) {
             return entry.getTerm();
         }
 
@@ -354,9 +362,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     /**
-     * Apply the provided index to the state machine
+     * Applies the log entries up to the specified index that is known to be committed to the state machine.
      *
-     * @param index a log index that is known to be committed
+     * @param index the log index
      */
     protected void applyLogToStateMachine(final long index) {
         long newLastApplied = context.getLastApplied();
@@ -381,14 +389,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             } else {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
-                LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+                log.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
                         logName(), i, i, index);
                 break;
             }
         }
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied);
-        }
+
+        log.debug("{}: Setting last applied to {}", logName(), newLastApplied);
+
         context.setLastApplied(newLastApplied);
 
         // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
@@ -422,16 +430,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return internalSwitchBehavior(createBehavior(context, newState));
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
-        if(!context.getRaftPolicy().automaticElectionsEnabled()) {
+        if (!context.getRaftPolicy().automaticElectionsEnabled()) {
             return this;
         }
 
-        LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
+        log.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
         try {
             close();
-        } catch (Exception e) {
-            LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
+        } catch (RuntimeException e) {
+            log.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
         }
         return newBehavior;
     }
@@ -462,20 +471,20 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
 
     /**
-     * Performs a snapshot with no capture on the replicated log.
-     * It clears the log from the supplied index or last-applied-1 which ever is minimum.
+     * Performs a snapshot with no capture on the replicated log. It clears the log from the supplied index or
+     * lastApplied-1 which ever is minimum.
      *
-     * @param snapshotCapturedIndex
+     * @param snapshotCapturedIndex the index from which to clear
      */
     protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
         long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex);
 
-        if(actualIndex != -1){
+        if (actualIndex != -1) {
             setReplicatedToAllIndex(actualIndex);
         }
     }
 
-    protected String getId(){
+    protected String getId() {
         return context.getId();
     }
 }
index 176704f3d377323a962d5a171cfe53c84beb494e..b86d28b5f79c4deaa8f2f8a4923305248ffd4762 100644 (file)
@@ -23,7 +23,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
 /**
- * The behavior of a RaftActor when it is in the CandidateState
+ * The behavior of a RaftActor when it is in the Candidate raft state.
  * <p/>
  * Candidates (§5.2):
  * <ul>
@@ -51,21 +51,19 @@ public class Candidate extends AbstractRaftActorBehavior {
     public Candidate(RaftActorContext context) {
         super(context, RaftState.Candidate);
 
-        for(PeerInfo peer: context.getPeers()) {
-            if(peer.isVoting()) {
+        for (PeerInfo peer: context.getPeers()) {
+            if (peer.isVoting()) {
                 votingPeers.add(peer.getId());
             }
         }
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
-        }
+        log.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
 
         votesRequired = getMajorityVoteCount(votingPeers.size());
 
         startNewTerm();
 
-        if(votingPeers.isEmpty()){
+        if (votingPeers.isEmpty()) {
             actor().tell(ElectionTimeout.INSTANCE, actor());
         } else {
             scheduleElection(electionDuration());
@@ -83,16 +81,13 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries) {
+    protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
-        }
+        log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
 
         // Some other candidate for the same term became a leader and sent us an append entry
-        if(currentTerm() == appendEntries.getTerm()){
-            LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
+        if (currentTerm() == appendEntries.getTerm()) {
+            log.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
                     logName(), currentTerm());
 
             return switchBehavior(new Follower(context));
@@ -108,15 +103,15 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     @Override
     protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
-        LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
+        log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
 
         if (requestVoteReply.isVoteGranted()) {
             voteCount++;
         }
 
         if (voteCount >= votesRequired) {
-            if(context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
-                LOG.debug("{}: LastApplied index {} is behind last index {}", logName(), context.getLastApplied(),
+            if (context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
+                log.debug("{}: LastApplied index {} is behind last index {}", logName(), context.getLastApplied(),
                         context.getReplicatedLog().lastIndex());
                 return internalSwitchBehavior(RaftState.PreLeader);
             } else {
@@ -130,7 +125,7 @@ public class Candidate extends AbstractRaftActorBehavior {
     @Override
     public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
         if (message instanceof ElectionTimeout) {
-            LOG.debug("{}: Received ElectionTimeout", logName());
+            log.debug("{}: Received ElectionTimeout", logName());
 
             if (votesRequired == 0) {
                 // If there are no peers then we should be a Leader
@@ -151,10 +146,8 @@ public class Candidate extends AbstractRaftActorBehavior {
 
             RaftRPC rpc = (RaftRPC) message;
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
+            log.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
                         context.getTermInformation().getCurrentTerm());
-            }
 
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (§5.1)
@@ -187,21 +180,21 @@ public class Candidate extends AbstractRaftActorBehavior {
         long newTerm = currentTerm + 1;
         context.getTermInformation().updateAndPersist(newTerm, context.getId());
 
-        LOG.debug("{}: Starting new term {}", logName(), newTerm);
+        log.debug("{}: Starting new term {}", logName(), newTerm);
 
         // Request for a vote
         // TODO: Retry request for vote if replies do not arrive in a reasonable
         // amount of time TBD
         for (String peerId : votingPeers) {
             ActorSelection peerActor = context.getPeerActorSelection(peerId);
-            if(peerActor != null) {
+            if (peerActor != null) {
                 RequestVote requestVote = new RequestVote(
                         context.getTermInformation().getCurrentTerm(),
                         context.getId(),
                         context.getReplicatedLog().lastIndex(),
                         context.getReplicatedLog().lastTerm());
 
-                LOG.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
+                log.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
 
                 peerActor.tell(requestVote, context.getActor());
             }
index cb141f9f1e5b28f3c7a3f7428ad827d5706881d0..3f68b50a4f44c104bb295a033afe5a5e164d9279 100644 (file)
@@ -40,7 +40,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 
 /**
- * The behavior of a RaftActor in the Follower state
+ * The behavior of a RaftActor in the Follower raft state.
  * <p/>
  * <ul>
  * <li> Respond to RPCs from candidates and leaders
@@ -57,7 +57,7 @@ public class Follower extends AbstractRaftActorBehavior {
     private final SyncStatusTracker initialSyncStatusTracker;
 
     private final Procedure<ReplicatedLogEntry> appendAndPersistCallback =
-            logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
+        logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
 
     private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
@@ -110,8 +110,8 @@ public class Follower extends AbstractRaftActorBehavior {
         lastLeaderMessageTimer.start();
     }
 
-    private boolean isLogEntryPresent(long index){
-        if(context.getReplicatedLog().isInSnapshot(index)) {
+    private boolean isLogEntryPresent(long index) {
+        if (context.getReplicatedLog().isInSnapshot(index)) {
             return true;
         }
 
@@ -120,18 +120,18 @@ public class Follower extends AbstractRaftActorBehavior {
 
     }
 
-    private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){
-        initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex());
+    private void updateInitialSyncStatus(long currentLeaderCommit, String newLeaderId) {
+        initialSyncStatusTracker.update(newLeaderId, currentLeaderCommit, context.getCommitIndex());
     }
 
     @Override
     protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
 
         int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0;
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("{}: handleAppendEntries: {}", logName(), appendEntries);
-        } else if(LOG.isDebugEnabled() && numLogEntries > 0) {
-            LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+        if (log.isTraceEnabled()) {
+            log.trace("{}: handleAppendEntries: {}", logName(), appendEntries);
+        } else if (log.isDebugEnabled() && numLogEntries > 0) {
+            log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
         }
 
         // TODO : Refactor this method into a bunch of smaller methods
@@ -139,8 +139,8 @@ public class Follower extends AbstractRaftActorBehavior {
         // cover the code properly
 
         if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
-            LOG.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the " +
-                    "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
+            log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the "
+                + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
             snapshotTracker = null;
         }
 
@@ -149,9 +149,7 @@ public class Follower extends AbstractRaftActorBehavior {
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
                     lastIndex(), lastTerm(), context.getPayloadVersion());
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
-            }
+            log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
             sender.tell(reply, actor());
 
             return this;
@@ -169,7 +167,7 @@ public class Follower extends AbstractRaftActorBehavior {
             // We found that the log was out of sync so just send a negative
             // reply and return
 
-            LOG.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}",
+            log.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}",
                         logName(), lastIndex, lastTerm());
 
             sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
@@ -179,7 +177,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
 
-            LOG.debug("{}: Number of entries to be appended = {}", logName(),
+            log.debug("{}: Number of entries to be appended = {}", logName(),
                         appendEntries.getEntries().size());
 
             // 3. If an existing entry conflicts with a new one (same index
@@ -192,35 +190,35 @@ public class Follower extends AbstractRaftActorBehavior {
                 for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
                     ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
 
-                    if(!isLogEntryPresent(matchEntry.getIndex())) {
+                    if (!isLogEntryPresent(matchEntry.getIndex())) {
                         // newEntry not found in the log
                         break;
                     }
 
                     long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex());
 
-                    LOG.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry,
+                    log.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry,
                             existingEntryTerm);
 
                     // existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know
                     // what the term was so we'll assume it matches.
-                    if(existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) {
+                    if (existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) {
                         continue;
                     }
 
-                    if(!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
+                    if (!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
 
-                        LOG.debug("{}: Removing entries from log starting at {}", logName(),
+                        log.debug("{}: Removing entries from log starting at {}", logName(),
                                 matchEntry.getIndex());
 
                         // Entries do not match so remove all subsequent entries
-                        if(!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) {
+                        if (!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) {
                             // Could not remove the entries - this means the matchEntry index must be in the
                             // snapshot and not the log. In this case the prior entries are part of the state
                             // so we must send back a reply to force a snapshot to completely re-sync the
                             // follower's log and state.
 
-                            LOG.debug("{}: Could not remove entries - sending reply to force snapshot", logName());
+                            log.debug("{}: Could not remove entries - sending reply to force snapshot", logName());
                             sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
                                     lastTerm(), context.getPayloadVersion(), true), actor());
                             return this;
@@ -236,23 +234,23 @@ public class Follower extends AbstractRaftActorBehavior {
             }
 
             lastIndex = lastIndex();
-            LOG.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
+            log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
                     lastIndex, addEntriesFrom);
 
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
                 ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
 
-                LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
+                log.debug("{}: Append entry to log {}", logName(), entry.getData());
 
                 context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback);
 
-                if(entry.getData() instanceof ServerConfigurationPayload) {
+                if (entry.getData() instanceof ServerConfigurationPayload) {
                     context.updatePeerIds((ServerConfigurationPayload)entry.getData());
                 }
             }
 
-            LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
+            log.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
         }
 
         // 5. If leaderCommit > commitIndex, set commitIndex =
@@ -261,23 +259,22 @@ public class Follower extends AbstractRaftActorBehavior {
         lastIndex = lastIndex();
         long prevCommitIndex = context.getCommitIndex();
 
-        if(appendEntries.getLeaderCommit() > prevCommitIndex) {
+        if (appendEntries.getLeaderCommit() > prevCommitIndex) {
             context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex));
         }
 
         if (prevCommitIndex != context.getCommitIndex()) {
-            LOG.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
+            log.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
         }
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (§5.3)
         // check if there are any entries to be applied. last-applied can be equal to last-index
-        if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
-            context.getLastApplied() < lastIndex) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: applyLogToStateMachine, " +
-                        "appendEntries.getLeaderCommit(): {}," +
-                        "context.getLastApplied(): {}, lastIndex(): {}", logName(),
+        if (appendEntries.getLeaderCommit() > context.getLastApplied()
+                && context.getLastApplied() < lastIndex) {
+            if (log.isDebugEnabled()) {
+                log.debug("{}: applyLogToStateMachine, appendEntries.getLeaderCommit(): {},"
+                        + "context.getLastApplied(): {}, lastIndex(): {}", logName(),
                     appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex);
             }
 
@@ -287,10 +284,10 @@ public class Follower extends AbstractRaftActorBehavior {
         AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
             lastIndex, lastTerm(), context.getPayloadVersion());
 
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply);
-        } else if(LOG.isDebugEnabled() && numLogEntries > 0) {
-            LOG.debug("{}: handleAppendEntries returning : {}", logName(), reply);
+        if (log.isTraceEnabled()) {
+            log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
+        } else if (log.isDebugEnabled() && numLogEntries > 0) {
+            log.debug("{}: handleAppendEntries returning : {}", logName(), reply);
         }
 
         sender.tell(reply, actor());
@@ -316,39 +313,39 @@ public class Follower extends AbstractRaftActorBehavior {
             // an entry at prevLogIndex and this follower has no entries in
             // it's log.
 
-            LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+            log.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
                         logName(), appendEntries.getPrevLogIndex());
         } else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry was not found in it's log
 
-            LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, snapshotIndex: {}",
-                        logName(), appendEntries.getPrevLogIndex(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
+            log.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - "
+                    + "lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
+                    context.getReplicatedLog().getSnapshotIndex());
         } else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry does exist in the follower's log but it has
             // a different term in it
 
-            LOG.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries " +
-                      "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(),
-                      prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
-        } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
+            log.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries"
+                      + "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(),
+                      prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex,
+                      context.getReplicatedLog().getSnapshotIndex());
+        } else if (appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
                 && appendEntries.getReplicatedToAllIndex() != -1
                 && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
             // This append entry comes from a leader who has it's log aggressively trimmed and so does not have
             // the previous entry in it's in-memory journal
 
-            LOG.debug(
-                    "{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
-                    logName(), appendEntries.getReplicatedToAllIndex());
-        } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
+            log.debug("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the"
+                    + " in-memory journal", logName(), appendEntries.getReplicatedToAllIndex());
+        } else if (appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
                 && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
                 && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) {
-            LOG.debug(
-                    "{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
-                    logName(), appendEntries.getEntries().get(0).getIndex() - 1);
+            log.debug("{}: Cannot append entries because the calculated previousIndex {} was not found in the "
+                    + " in-memory journal", logName(), appendEntries.getEntries().get(0).getIndex() - 1);
         } else {
             outOfSync = false;
         }
@@ -383,7 +380,7 @@ public class Follower extends AbstractRaftActorBehavior {
         // set currentTerm = T, convert to follower (§5.1)
         // This applies to all RPC messages and responses
         if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-            LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+            log.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
                 logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
             context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
@@ -412,33 +409,33 @@ public class Follower extends AbstractRaftActorBehavior {
         // lastLeaderMessageTimer.
         long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS);
         long electionTimeoutInMillis = context.getConfigParams().getElectionTimeOutInterval().toMillis();
-        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() ||
-                lastLeaderMessageInterval >= electionTimeoutInMillis;
+        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning()
+                || lastLeaderMessageInterval >= electionTimeoutInMillis;
 
-        if(canStartElection()) {
-            if(message instanceof TimeoutNow) {
-                LOG.debug("{}: Received TimeoutNow - switching to Candidate", logName());
+        if (canStartElection()) {
+            if (message instanceof TimeoutNow) {
+                log.debug("{}: Received TimeoutNow - switching to Candidate", logName());
                 return internalSwitchBehavior(RaftState.Candidate);
-            } else if(noLeaderMessageReceived) {
+            } else if (noLeaderMessageReceived) {
                 // Check the cluster state to see if the leader is known to be up before we go to Candidate.
                 // However if we haven't heard from the leader in a long time even though the cluster state
                 // indicates it's up then something is wrong - leader might be stuck indefinitely - so switch
                 // to Candidate,
                 long maxElectionTimeout = electionTimeoutInMillis * MAX_ELECTION_TIMEOUT_FACTOR;
-                if(isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
-                    LOG.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
+                if (isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
+                    log.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
                     scheduleElection(electionDuration());
                 } else {
-                    LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+                    log.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
                     return internalSwitchBehavior(RaftState.Candidate);
                 }
             } else {
-                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}",
+                log.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}",
                         logName(), lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval());
                 scheduleElection(electionDuration());
             }
-        } else if(message instanceof ElectionTimeout) {
-            if(noLeaderMessageReceived) {
+        } else if (message instanceof ElectionTimeout) {
+            if (noLeaderMessageReceived) {
                 setLeaderId(null);
             }
 
@@ -449,17 +446,17 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     private boolean isLeaderAvailabilityKnown() {
-        if(leaderId == null) {
+        if (leaderId == null) {
             return false;
         }
 
         Optional<Cluster> cluster = context.getCluster();
-        if(!cluster.isPresent()) {
+        if (!cluster.isPresent()) {
             return false;
         }
 
         ActorSelection leaderActor = context.getPeerActorSelection(leaderId);
-        if(leaderActor == null) {
+        if (leaderActor == null) {
             return false;
         }
 
@@ -468,43 +465,43 @@ public class Follower extends AbstractRaftActorBehavior {
         CurrentClusterState state = cluster.get().state();
         Set<Member> unreachable = state.getUnreachable();
 
-        LOG.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress,
+        log.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress,
                 unreachable);
 
-        for(Member m: unreachable) {
-            if(leaderAddress.equals(m.address())) {
-                LOG.info("{}: Leader {} is unreachable", logName(), leaderAddress);
+        for (Member m: unreachable) {
+            if (leaderAddress.equals(m.address())) {
+                log.info("{}: Leader {} is unreachable", logName(), leaderAddress);
                 return false;
             }
         }
 
-        for(Member m: state.getMembers()) {
-            if(leaderAddress.equals(m.address())) {
-                if(m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) {
-                    LOG.debug("{}: Leader {} cluster status is {} - leader is available", logName(),
+        for (Member m: state.getMembers()) {
+            if (leaderAddress.equals(m.address())) {
+                if (m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) {
+                    log.debug("{}: Leader {} cluster status is {} - leader is available", logName(),
                             leaderAddress, m.status());
                     return true;
                 } else {
-                    LOG.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(),
+                    log.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(),
                             leaderAddress, m.status());
                     return false;
                 }
             }
         }
 
-        LOG.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress);
+        log.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress);
 
         return false;
     }
 
     private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
-        LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
+        log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
 
         leaderId = installSnapshot.getLeaderId();
 
-        if(snapshotTracker == null){
-            snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId());
+        if (snapshotTracker == null) {
+            snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId());
         }
 
         updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
@@ -513,8 +510,8 @@ public class Follower extends AbstractRaftActorBehavior {
             final InstallSnapshotReply reply = new InstallSnapshotReply(
                     currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
 
-            if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
-                    installSnapshot.getLastChunkHashCode())){
+            if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+                    installSnapshot.getLastChunkHashCode())) {
                 Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
                         new ArrayList<>(),
                         installSnapshot.getLastIncludedIndex(),
@@ -528,7 +525,7 @@ public class Follower extends AbstractRaftActorBehavior {
                 ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
                     @Override
                     public void onSuccess() {
-                        LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+                        log.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
 
                         sender.tell(reply, actor());
                     }
@@ -543,24 +540,17 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 snapshotTracker = null;
             } else {
-                LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+                log.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
 
                 sender.tell(reply, actor());
             }
         } catch (SnapshotTracker.InvalidChunkException e) {
-            LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
+            log.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
 
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                     -1, false), actor());
             snapshotTracker = null;
 
-        } catch (Exception e){
-            LOG.error("{}: Exception in InstallSnapshot of follower", logName(), e);
-
-            //send reply with success as false. The chunk will be sent again on failure
-            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
-                    installSnapshot.getChunkIndex(), false), actor());
-
         }
     }
 
@@ -570,7 +560,7 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    SnapshotTracker getSnapshotTracker(){
+    SnapshotTracker getSnapshotTracker() {
         return snapshotTracker;
     }
 }
index a02e40092b3e314ee83bf0ae10e5a70e7106abea..658c129e2d6cf0d0924b897448e80be639169141 100644 (file)
@@ -44,7 +44,7 @@ public class IsolatedLeader extends AbstractLeader {
         // it can happen that this isolated leader interacts with a new leader in the cluster and
         // changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated
         if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) {
-            LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId());
+            log.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId());
             return internalSwitchBehavior(new Leader(context, this));
         }
         return ret;
index 827364c29faeefd03149b5b65777195ca3afa5d6..4821d98835371c299c27776a81c4d9998501399d 100644 (file)
@@ -23,7 +23,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 
 /**
- * The behavior of a RaftActor when it is in the Leader state
+ * The behavior of a RaftActor when it is in the Leader state.
  * <p/>
  * Leaders:
  * <ul>
@@ -34,15 +34,14 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
  * respond after entry applied to state machine (§5.3)
  * <li> If last log index â‰¥ nextIndex for a follower: send
  * AppendEntries RPC with log entries starting at nextIndex
- * <ul>
  * <li> If successful: update nextIndex and matchIndex for
  * follower (§5.3)
  * <li> If AppendEntries fails because of log inconsistency:
  * decrement nextIndex and retry (§5.3)
- * </ul>
  * <li> If there exists an N such that N > commitIndex, a majority
  * of matchIndex[i] â‰¥ N, and log[N].term == currentTerm:
  * set commitIndex = N (§5.3, Â§5.4).
+ * </ul>
  */
 public class Leader extends AbstractLeader {
     /**
@@ -53,7 +52,7 @@ public class Leader extends AbstractLeader {
     static final Object ISOLATED_LEADER_CHECK = new Object();
 
     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
-    private @Nullable LeadershipTransferContext leadershipTransferContext;
+    @Nullable private LeadershipTransferContext leadershipTransferContext;
 
     Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
         super(context, RaftState.Leader, initializeFromLeader);
@@ -69,7 +68,7 @@ public class Leader extends AbstractLeader {
 
         if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
             if (isLeaderIsolated()) {
-                LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
                     context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
                 return internalSwitchBehavior(new IsolatedLeader(context, this));
             } else {
@@ -81,15 +80,16 @@ public class Leader extends AbstractLeader {
     }
 
     @Override
-    protected void beforeSendHeartbeat(){
-        if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+    protected void beforeSendHeartbeat() {
+        if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS)
+                > context.getConfigParams().getIsolatedCheckIntervalInMillis()) {
             context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
             isolatedLeaderCheck.reset().start();
         }
 
-        if(leadershipTransferContext != null && leadershipTransferContext.isExpired(
+        if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
                 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
-            LOG.debug("{}: Leadership transfer expired", logName());
+            log.debug("{}: Leadership transfer expired", logName());
             leadershipTransferContext = null;
         }
     }
@@ -117,10 +117,10 @@ public class Leader extends AbstractLeader {
      *     {@link RaftActorLeadershipTransferCohort#abortTtransfer}.</li>
      * </ul>
      *
-     * @param leadershipTransferCohort
+     * @param leadershipTransferCohort the cohort participating in the leadership transfer
      */
     public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
-        LOG.debug("{}: Attempting to transfer leadership", logName());
+        log.debug("{}: Attempting to transfer leadership", logName());
 
         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
 
@@ -129,23 +129,23 @@ public class Leader extends AbstractLeader {
     }
 
     private void tryToCompleteLeadershipTransfer(String followerId) {
-        if(leadershipTransferContext == null) {
+        if (leadershipTransferContext == null) {
             return;
         }
 
         FollowerLogInformation followerInfo = getFollower(followerId);
-        if(followerInfo == null) {
+        if (followerInfo == null) {
             return;
         }
 
         long lastIndex = context.getReplicatedLog().lastIndex();
         boolean isVoting = context.getPeerInfo(followerId).isVoting();
 
-        LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
+        log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
                 logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
 
-        if(isVoting && followerInfo.getMatchIndex() == lastIndex) {
-            LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
+        if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
+            log.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
 
             // We can't be sure if the follower has applied all its log entries to its state so send an
             // additional AppendEntries with the latest commit index.
@@ -155,7 +155,7 @@ public class Leader extends AbstractLeader {
             ActorSelection followerActor = context.getPeerActorSelection(followerId);
             followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
 
-            LOG.debug("{}: Leader transfer complete", logName());
+            log.debug("{}: Leader transfer complete", logName());
 
             leadershipTransferContext.transferCohort.transferComplete();
             leadershipTransferContext = null;
@@ -164,7 +164,7 @@ public class Leader extends AbstractLeader {
 
     @Override
     public void close() {
-        if(leadershipTransferContext != null) {
+        if (leadershipTransferContext != null) {
             LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
             leadershipTransferContext = null;
             localLeadershipTransferContext.transferCohort.abortTransfer();
@@ -192,7 +192,7 @@ public class Leader extends AbstractLeader {
         }
 
         boolean isExpired(long timeout) {
-            if(timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
+            if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
                 transferCohort.abortTransfer();
                 return true;
             }
index eab3f7d7da1d49feb57f71b801b4cbe6b4362d2a..09f5186a012b51f6e0d4296ccd35af2aeebced8c 100644 (file)
@@ -22,7 +22,7 @@ import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
  * the log with the leader's current term. Once the no-op entry is committed, all prior entries are committed
  * indirectly. Once all entries are committed, ie commitIndex matches the last log index, it switches to the
  * normal Leader state.
- * <p>
+ * <p/>
  * The use of a no-op entry in this manner is outlined in the last paragraph in Â§8 of the
  * <a href="https://raft.github.io/raft.pdf">extended raft version</a>.
  *
@@ -39,7 +39,7 @@ public class PreLeader extends AbstractLeader {
     @Override
     public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
         if (message instanceof ApplyState) {
-            if(context.getLastApplied() >= context.getReplicatedLog().lastIndex()) {
+            if (context.getLastApplied() >= context.getReplicatedLog().lastIndex()) {
                 // We've applied all entries - we can switch to Leader.
                 return internalSwitchBehavior(new Leader(context, this));
             } else {
index bf5116935d108c91ecec947cf7544ecc7f7cd3dc..6b51c34e88ed6143e6c2e7a8c5c156266f83680f 100644 (file)
@@ -13,19 +13,10 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftState;
 
 /**
- * A RaftActorBehavior represents the specific behavior of a RaftActor
- * <p>
- * A RaftActor can behave as one of the following,
- * <ul>
- *     <li> Follower </li>
- *     <li> Candidate </li>
- *     <li> Leader </li>
- * </ul>
- * <p>
- * In each of these behaviors the Raft Actor handles the same Raft messages
- * differently.
+ * The interface for a class that implements a specific behavior of a RaftActor. The types of behaviors are enumerated
+ * by {@link RaftState}. Each handles the same Raft messages differently.
  */
-public interface RaftActorBehavior extends AutoCloseable{
+public interface RaftActorBehavior extends AutoCloseable {
 
     /**
      * Handle a message. If the processing of the message warrants a state
@@ -37,42 +28,50 @@ public interface RaftActorBehavior extends AutoCloseable{
      *
      * @return The new behavior or current behavior, or null if the message was not handled.
      */
-    @Nullable RaftActorBehavior handleMessage(ActorRef sender, Object message);
+    @Nullable
+    RaftActorBehavior handleMessage(ActorRef sender, Object message);
 
     /**
+     * Returns the state associated with this behavior.
      *
-     * @return The state associated with a given behavior
+     * @return the RaftState
      */
     RaftState state();
 
     /**
+     * Returns the id of the leader.
      *
-     * @return The Id of the Leader if known else null
+     * @return the id of the leader or null if not known
      */
+    @Nullable
     String getLeaderId();
 
     /**
-     * setting the index of the log entry which is replicated to all nodes
-     * @param replicatedToAllIndex
+     * Sets the index of the last log entry that has been replicated to all peers.
+     *
+     * @param replicatedToAllIndex the index
      */
     void setReplicatedToAllIndex(long replicatedToAllIndex);
 
     /**
-     * @return the index of the log entry which is replicated to all nodes
+     * Returns the index of the last log entry that has been replicated to all peers.
+     *
+     * @return the index or -1 if not known
      */
     long getReplicatedToAllIndex();
 
     /**
-     * @return the leader's payload data version.
+     * Returns the leader's payload data version.
+     *
+     * @return a short representing the version
      */
     short getLeaderPayloadVersion();
 
     /**
-     * switchBehavior makes sure that the current behavior is shutdown before it switches to the new
-     * behavior
+     * Closes the current behavior and switches to the specified behavior, if possible.
      *
-     * @param behavior The new behavior to switch to
-     * @return The new behavior
+     * @param behavior the new behavior to switch to
+     * @return the new behavior
      */
     RaftActorBehavior switchBehavior(RaftActorBehavior behavior);
 
index fadca3b15d9a174e8b447b963df8bd06e2fe9cde..3ba020b8143aa69f4c106a5c53f150a117bb1d70 100644 (file)
@@ -15,10 +15,10 @@ import java.util.Arrays;
 import org.slf4j.Logger;
 
 /**
- * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
+ * Helper class that maintains state for a snapshot that is being installed in chunks on a Follower.
  */
 public class SnapshotTracker {
-    private final Logger LOG;
+    private final Logger log;
     private final int totalChunks;
     private final String leaderId;
     private ByteString collectedChunks = ByteString.EMPTY;
@@ -26,37 +26,39 @@ public class SnapshotTracker {
     private boolean sealed = false;
     private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE;
 
-    SnapshotTracker(Logger LOG, int totalChunks, String leaderId) {
-        this.LOG = LOG;
+    SnapshotTracker(Logger log, int totalChunks, String leaderId) {
+        this.log = log;
         this.totalChunks = totalChunks;
         this.leaderId = Preconditions.checkNotNull(leaderId);
     }
 
     /**
-     * Adds a chunk to the tracker
+     * Adds a chunk to the tracker.
      *
-     * @param chunkIndex
-     * @param chunk
-     * @return true when the lastChunk is received
-     * @throws InvalidChunkException
+     * @param chunkIndex the index of the chunk
+     * @param chunk the chunk data
+     * @param lastChunkHashCode the optional hash code for the chunk
+     * @return true if this is the last chunk is received
+     * @throws InvalidChunkException if the chunk index is invalid or out of order
      */
-    boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
-        LOG.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
+    boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> maybeLastChunkHashCode)
+            throws InvalidChunkException {
+        log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
                 chunkIndex, lastChunkIndex, collectedChunks.size(), this.lastChunkHashCode);
 
-        if(sealed){
-            throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
+        if (sealed) {
+            throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex
+                    + " all chunks already received");
         }
 
-        if(lastChunkIndex + 1 != chunkIndex){
+        if (lastChunkIndex + 1 != chunkIndex) {
             throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
         }
 
-        if(lastChunkHashCode.isPresent()){
-            if(lastChunkHashCode.get() != this.lastChunkHashCode){
-                throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
-                        "the senders hash code, expected " + this.lastChunkHashCode + " was " + lastChunkHashCode.get());
-            }
+        if (maybeLastChunkHashCode.isPresent() && maybeLastChunkHashCode.get() != this.lastChunkHashCode) {
+            throw new InvalidChunkException("The hash code of the recorded last chunk does not match "
+                    + "the senders hash code, expected " + this.lastChunkHashCode + " was "
+                    + maybeLastChunkHashCode.get());
         }
 
         sealed = chunkIndex == totalChunks;
@@ -66,15 +68,15 @@ public class SnapshotTracker {
         return sealed;
     }
 
-    byte[] getSnapshot(){
-        if(!sealed) {
+    byte[] getSnapshot() {
+        if (!sealed) {
             throw new IllegalStateException("lastChunk not received yet");
         }
 
         return collectedChunks.toByteArray();
     }
 
-    ByteString getCollectedChunks(){
+    ByteString getCollectedChunks() {
         return collectedChunks;
     }
 
@@ -85,9 +87,8 @@ public class SnapshotTracker {
     public static class InvalidChunkException extends Exception {
         private static final long serialVersionUID = 1L;
 
-        InvalidChunkException(String message){
+        InvalidChunkException(String message) {
             super(message);
         }
     }
-
 }
index 85622a5908bf34074079201a5129618da8f867e8..0986565bac2748b94037a7a19f3f4c6a2912d651 100644 (file)
@@ -40,32 +40,32 @@ public class SyncStatusTracker {
         this.syncThreshold = syncThreshold;
     }
 
-    public void update(String leaderId, long leaderCommit, long commitIndex){
-        leaderId = Preconditions.checkNotNull(leaderId, "leaderId should not be null");
+    public void update(String leaderId, long leaderCommit, long commitIndex) {
+        Preconditions.checkNotNull(leaderId, "leaderId should not be null");
 
-        if(!leaderId.equals(syncedLeaderId)){
+        if (!leaderId.equals(syncedLeaderId)) {
             minimumExpectedIndex = leaderCommit;
             changeSyncStatus(NOT_IN_SYNC, FORCE_STATUS_CHANGE);
             syncedLeaderId = leaderId;
             return;
         }
 
-        if((leaderCommit - commitIndex) > syncThreshold){
+        if (leaderCommit - commitIndex > syncThreshold) {
             changeSyncStatus(NOT_IN_SYNC);
-        } else if((leaderCommit - commitIndex) <= syncThreshold && commitIndex >= minimumExpectedIndex) {
+        } else if (leaderCommit - commitIndex <= syncThreshold && commitIndex >= minimumExpectedIndex) {
             changeSyncStatus(IN_SYNC);
         }
     }
 
-    private void changeSyncStatus(boolean newSyncStatus){
+    private void changeSyncStatus(boolean newSyncStatus) {
         changeSyncStatus(newSyncStatus, !FORCE_STATUS_CHANGE);
     }
 
-    private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange){
-        if(syncStatus == newSyncStatus && !forceStatusChange){
+    private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange) {
+        if (syncStatus == newSyncStatus && !forceStatusChange) {
             return;
         }
         actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender());
         syncStatus = newSyncStatus;
     }
-}
\ No newline at end of file
+}
index 6080232538e22cd755807f449cbb84bbbfad4210..8f23a89b8bdbd4cad1eefc74c270d48d653f102b 100644 (file)
@@ -15,7 +15,7 @@ import java.io.Serializable;
  * Request to locate the leader raft actor. Each {@link org.opendaylight.controller.cluster.raft.RaftActor} must
  * respond with a {@link FindLeaderReply} containing the address of the leader, as it is known to that particular
  * actor.
- *
+ * <p/>
  * This message is intended for testing purposes only.
  */
 @VisibleForTesting
index 549fe038ed5467ca77d963816d4325bbcee0ad3d..5fbeb92fbdd0459d2ea5ff73a06f033935c77fe0 100644 (file)
@@ -17,7 +17,7 @@ import javax.annotation.Nullable;
  * Reply to {@link FindLeader} message, containing the address of the leader actor, as known to the raft actor which
  * sent the message. If the responding actor does not have knowledge of the leader, {@link #getLeaderActor()} will
  * return {@link Optional#empty()}.
- *
+ * <p/>
  * This message is intended for testing purposes only.
  */
 @VisibleForTesting
index 0bd85b1e6d19082eb1f86f0c9f272c3bddafec8a..cf9bb620dd6a4aa98e73a5ba7198ff0e1bdd0b7c 100644 (file)
@@ -13,7 +13,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * The response to a GetOnDemandRaftState message,
+ * The response to a GetOnDemandRaftState message.
  *
  * @author Thomas Pantelis
  */
index 0aabb49f6be4896738401499dbf54cc66720d7dc..fc5255e2712cea11e9932d18ecd9a898cae0c660 100644 (file)
@@ -14,7 +14,7 @@ public class AbstractRaftRPC implements RaftRPC {
     // term
     private long term;
 
-    protected AbstractRaftRPC(long term){
+    protected AbstractRaftRPC(long term) {
         this.term = term;
     }
 
index dc1ebf0730fd49bd7b3b73ba3c54e4c5127bc02c..487ea9b09db22ebce3d4d16d2dbc185dd61b0063 100644 (file)
@@ -34,8 +34,7 @@ public class AppendEntries extends AbstractRaftRPC {
     // term of prevLogIndex entry
     private final long prevLogTerm;
 
-    // log entries to store (empty for heartbeat;
-    // may send more than one for efficiency)
+    // log entries to store (empty for heart beat - may send more than one for efficiency)
     private transient List<ReplicatedLogEntry> entries;
 
     // leader's commitIndex
@@ -105,6 +104,9 @@ public class AppendEntries extends AbstractRaftRPC {
 
         private AppendEntries appendEntries;
 
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
         public Proxy() {
         }
 
@@ -123,7 +125,7 @@ public class AppendEntries extends AbstractRaftRPC {
             out.writeShort(appendEntries.payloadVersion);
 
             out.writeInt(appendEntries.entries.size());
-            for(ReplicatedLogEntry e: appendEntries.entries) {
+            for (ReplicatedLogEntry e: appendEntries.entries) {
                 out.writeLong(e.getIndex());
                 out.writeLong(e.getTerm());
                 out.writeObject(e.getData());
@@ -142,7 +144,7 @@ public class AppendEntries extends AbstractRaftRPC {
 
             int size = in.readInt();
             List<ReplicatedLogEntry> entries = new ArrayList<>(size);
-            for(int i = 0; i < size; i++) {
+            for (int i = 0; i < size; i++) {
                 entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
             }
 
index 7784c7eec9c91413180cce47fff68467bec198f6..e2f0ba9014c9e469b21599359db3007e7cb4e2be 100644 (file)
@@ -15,7 +15,7 @@ import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 
 /**
- * Reply for the AppendEntriesRpc message
+ * Reply for the AppendEntries message.
  */
 public class AppendEntriesReply extends AbstractRaftRPC {
     private static final long serialVersionUID = -7487547356392536683L;
@@ -110,6 +110,9 @@ public class AppendEntriesReply extends AbstractRaftRPC {
 
         private AppendEntriesReply appendEntriesReply;
 
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
         public Proxy() {
         }
 
index 5958cfdedf9d7460001cf93cbeda28436da6c383..4528dcbf5e9370dec5631fd7efcd97745aaa876b 100644 (file)
@@ -15,6 +15,9 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 
+/**
+ * Message sent from a leader to install a snapshot chunk on a follower.
+ */
 public class InstallSnapshot extends AbstractRaftRPC {
     private static final long serialVersionUID = 1L;
 
@@ -28,7 +31,8 @@ public class InstallSnapshot extends AbstractRaftRPC {
     private final Optional<ServerConfigurationPayload> serverConfig;
 
     public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, byte[] data,
-            int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode, Optional<ServerConfigurationPayload> serverConfig) {
+            int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode,
+            Optional<ServerConfigurationPayload> serverConfig) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
@@ -100,6 +104,9 @@ public class InstallSnapshot extends AbstractRaftRPC {
 
         private InstallSnapshot installSnapshot;
 
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
         public Proxy() {
         }
 
@@ -117,12 +124,12 @@ public class InstallSnapshot extends AbstractRaftRPC {
             out.writeInt(installSnapshot.totalChunks);
 
             out.writeByte(installSnapshot.lastChunkHashCode.isPresent() ? 1 : 0);
-            if(installSnapshot.lastChunkHashCode.isPresent()) {
+            if (installSnapshot.lastChunkHashCode.isPresent()) {
                 out.writeInt(installSnapshot.lastChunkHashCode.get().intValue());
             }
 
             out.writeByte(installSnapshot.serverConfig.isPresent() ? 1 : 0);
-            if(installSnapshot.serverConfig.isPresent()) {
+            if (installSnapshot.serverConfig.isPresent()) {
                 out.writeObject(installSnapshot.serverConfig.get());
             }
 
@@ -140,13 +147,13 @@ public class InstallSnapshot extends AbstractRaftRPC {
 
             Optional<Integer> lastChunkHashCode = Optional.absent();
             boolean chunkHashCodePresent = in.readByte() == 1;
-            if(chunkHashCodePresent) {
+            if (chunkHashCodePresent) {
                 lastChunkHashCode = Optional.of(in.readInt());
             }
 
             Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
             boolean serverConfigPresent = in.readByte() == 1;
-            if(serverConfigPresent) {
+            if (serverConfigPresent) {
                 serverConfig = Optional.of((ServerConfigurationPayload)in.readObject());
             }
 
index 41d2ae68c90e8e2f3846d6c3806693843f2b091c..4f0ca1320f3db7ad98444368a374f171c5a883e6 100644 (file)
@@ -58,6 +58,9 @@ public class InstallSnapshotReply extends AbstractRaftRPC {
 
         private InstallSnapshotReply installSnapshotReply;
 
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
         public Proxy() {
         }
 
index ea56aeca568784dca1e72d6f7295126bd711fdd1..bbb1222effccbc5ee276bfc64dd6aff6adcefed7 100644 (file)
@@ -25,8 +25,6 @@ public final class RequestVoteReply extends AbstractRaftRPC {
 
     @Override
     public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("RequestVoteReply [term=").append(getTerm()).append(", voteGranted=").append(voteGranted).append("]");
-        return builder.toString();
+        return "RequestVoteReply [term=" + getTerm() + ", voteGranted=" + voteGranted + "]";
     }
 }
index fba601cda6c24630862864bb94ff50d285d19414..3375137adac63d876463ac46982d3626f2f21137 100644 (file)
@@ -30,8 +30,6 @@ public class ServerRemoved implements Serializable {
 
     @Override
     public String toString() {
-        return "ServerRemoved{" +
-                "serverId='" + serverId + '\'' +
-                '}';
+        return "ServerRemoved [serverId=" + serverId + "]";
     }
 }
index 4d66fa6d4d3d0fbf4dac4bf006e2f259c63e614e..f5c44f87e4fe687f636a7b9f101b667a228552e7 100644 (file)
@@ -8,14 +8,14 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 /**
- * Local message sent to self on receiving InstallSnapshotReply from a follower, this message indicates that
- * the catchup of the follower is done succesfully during AddServer scenario
+ * Local message sent to self on receiving the InstallSnapshotReply from a follower indicating that
+ * the catch up of the follower has completed successfully for an AddServer operation.
  */
 public class  UnInitializedFollowerSnapshotReply {
     private final String followerId;
 
-    public UnInitializedFollowerSnapshotReply(String followerId){
-       this.followerId = followerId;
+    public UnInitializedFollowerSnapshotReply(String followerId) {
+        this.followerId = followerId;
     }
 
     public String getFollowerId() {
index a21c959a9ac4d737387db6bf7ddbe501275841ab..cad23a93980aa816363ab2f03c5f3ec229ee3088 100644 (file)
@@ -27,6 +27,9 @@ public class ApplyJournalEntries implements Serializable, MigratedSerializable {
 
         private ApplyJournalEntries applyEntries;
 
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
         public Proxy() {
             // For Externalizable
         }
@@ -38,7 +41,7 @@ public class ApplyJournalEntries implements Serializable, MigratedSerializable {
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
             out.writeLong(applyEntries.toIndex);
-         }
+        }
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
index 5650360c5ff89d24a09d1021066611fea44116f9..138ea0cfcf5de810a1ac0375125b76b42448397b 100644 (file)
@@ -24,6 +24,9 @@ public class DeleteEntries implements Serializable, MigratedSerializable {
 
         private DeleteEntries deleteEntries;
 
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
         public Proxy() {
             // For Externalizable
         }
@@ -35,7 +38,7 @@ public class DeleteEntries implements Serializable, MigratedSerializable {
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
             out.writeLong(deleteEntries.fromIndex);
-         }
+        }
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
index bd9bb3e417426113ea5ebaf2849df0a4d407607d..0f206b5d39d5b70f314801919f05292c683fa245 100644 (file)
@@ -33,6 +33,9 @@ public final class ServerConfigurationPayload extends Payload implements Persist
 
         private List<ServerInfo> serverConfig;
 
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
         public Proxy() {
             // For Externalizable
         }
@@ -48,7 +51,7 @@ public final class ServerConfigurationPayload extends Payload implements Persist
                 out.writeObject(i.getId());
                 out.writeBoolean(i.isVoting());
             }
-         }
+        }
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
index ae86921b926ebab030fd8b1f457a20ef11b5001b..55096d677e60019cfd86378a0f5fda71b8e61fdb 100644 (file)
@@ -22,6 +22,9 @@ public class UpdateElectionTerm implements Serializable, MigratedSerializable {
 
         private UpdateElectionTerm updateElectionTerm;
 
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
         public Proxy() {
             // For Externalizable
         }
@@ -34,7 +37,7 @@ public class UpdateElectionTerm implements Serializable, MigratedSerializable {
         public void writeExternal(final ObjectOutput out) throws IOException {
             out.writeLong(updateElectionTerm.currentTerm);
             out.writeObject(updateElectionTerm.votedFor);
-         }
+        }
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
index a66caa456a45af5c2be4885f77749f9c0e49f824..c535c2299d5cbbefa1d128f7eb96ab8f725034f7 100644 (file)
@@ -13,7 +13,7 @@ package org.opendaylight.controller.cluster.raft.policy;
  * we may want to be able to determine which Raft replica should become the leader - with Raft elections are
  * randomized so it is not possible to specify which replica should be the leader. The ability to specify
  * the leader would be quite useful when testing a raft cluster.
- *
+ * <p/>
  * Similarly we may want to customize when exactly we apply a modification to the state - with Raft a modification
  * is only applied to the state when the modification is replicated to a majority of the replicas. The ability to
  * apply a modification to the state before consensus would be useful in scenarios where you have only 2 nodes
@@ -31,10 +31,10 @@ public interface RaftPolicy {
 
     /**
      * According to Raft consensus on a Raft entry is achieved only after a Leader replicates a log entry to a
-     * majority of it's followers
+     * majority of it's followers.
      *
      * @return true if modification should be applied before consensus, false to apply modification to state
-     * as per Raft
+     *     as per Raft
      */
     boolean applyModificationToStateBeforeConsensus();
 }