From: Tom Pantelis Date: Tue, 4 Oct 2016 19:38:58 +0000 (-0400) Subject: Fix warnings and clean up javadocs in sal-akka-raft X-Git-Tag: release/carbon~447 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=660c3e22ca97bc613ea6f6288503620bba6fb233 Fix warnings and clean up javadocs in sal-akka-raft Fixed a lot of checkstyle warnings and cleaned up javadocs for the remaining classes sal-akka-raft. Most of the warnings/changes were for: - white space before if/for - white space before beginning brace - line too long - period after first sentence in javadoc - missing

in javadoc Change-Id: I99a3cd08af10d46acecd0e22f04d54b95e2287d9 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java index d449822163..9fb5554abc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java @@ -13,7 +13,7 @@ import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.raft.Snapshot; /** - * Internal message, issued by follower to its actor + * Internal message, issued by follower to its actor. */ public class ApplySnapshot { private static final Callback NOOP_CALLBACK = new Callback() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java index 7c182f04e4..14bd3a0af4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -38,7 +38,8 @@ public class CaptureSnapshot { this.installSnapshotInitiated = installSnapshotInitiated; this.replicatedToAllIndex = replicatedToAllIndex; this.replicatedToAllTerm = replicatedToAllTerm; - this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.emptyList(); + this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : + Collections.emptyList(); } public long getLastAppliedIndex() { @@ -80,7 +81,8 @@ public class CaptureSnapshot { .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=") .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated) .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=") - .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]"); + .append(replicatedToAllTerm).append(", unAppliedEntries size=") + .append(unAppliedEntries.size()).append("]"); return builder.toString(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java index d2100cb950..00ef63cf36 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java @@ -13,8 +13,7 @@ import javax.annotation.Nonnull; /** * The FollowerInitialSyncUpStatus is sent by a Follower to inform any RaftActor subclass whether the Follower - * is at least at the same commitIndex as the Leader was when it sent the follower the very first heartbeat. - * + * is at least at the same commitIndex as the Leader was when it sent the follower the very first heart beat. * This status can be used to determine if a Follower has caught up with the current Leader in an upgrade scenario * for example. */ diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendHeartBeat.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendHeartBeat.java index 5e1f20b97d..ba1c157d37 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendHeartBeat.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendHeartBeat.java @@ -11,10 +11,7 @@ package org.opendaylight.controller.cluster.raft.base.messages; import java.io.Serializable; /** - * This messages is sent to the Leader to prompt it to send a heartbeat - * to it's followers. - * - * Typically the Leader to itself on a schedule + * This messages is sent via a schedule to the Leader to prompt it to send a heart beat to its followers. */ public final class SendHeartBeat implements Serializable { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java index 94e31846ee..de33b8c95b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java @@ -13,8 +13,8 @@ import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.raft.Snapshot; /** - * Internal message sent from the SnapshotManager to its associated leader. The leader is expected to apply the - * {@link Snapshot} to its state. + * Internal message sent from the SnapshotManager to its associated leader when a snapshot capture is complete to + * prompt the leader to install the snapshot on its followers as needed. */ public final class SendInstallSnapshot { private final Snapshot snapshot; @@ -23,7 +23,8 @@ public final class SendInstallSnapshot { this.snapshot = Preconditions.checkNotNull(snapshot); } - @Nonnull public Snapshot getSnapshot() { + @Nonnull + public Snapshot getSnapshot() { return snapshot; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java index 9596bb3881..162878f3fd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java @@ -29,10 +29,6 @@ public class SwitchBehavior { @Override public String toString() { - final StringBuilder sb = new StringBuilder("SwitchBehavior{"); - sb.append("newState=").append(newState); - sb.append(", newTerm=").append(newTerm); - sb.append('}'); - return sb.toString(); + return "SwitchBehavior [newState=" + newState + ", newTerm=" + newTerm + "]"; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 479ca5ae7b..fab1714989 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -60,15 +60,14 @@ import scala.concurrent.duration.FiniteDuration; * respond after entry applied to state machine (§5.3) *

  • If last log index ≥ nextIndex for a follower: send * AppendEntries RPC with log entries starting at nextIndex - * *
  • If there exists an N such that N > commitIndex, a majority * of matchIndex[i] ≥ N, and log[N].term == currentTerm: * set commitIndex = N (§5.3, §5.4). + * */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Map followerToLog = new HashMap<>(); @@ -76,7 +75,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { /** * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really * expect the entries to be modified in sequence, hence we open-code the lookup. - * * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly, * but we already expect those to be far from frequent. */ @@ -90,18 +88,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { @Nullable AbstractLeader initializeFromLeader) { super(context, state); - if(initializeFromLeader != null) { + if (initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); snapshot = initializeFromLeader.snapshot; trackers.addAll(initializeFromLeader.trackers); } else { - for(PeerInfo peerInfo: context.getPeers()) { + for (PeerInfo peerInfo: context.getPeers()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); followerToLog.put(peerInfo.getId(), followerLogInformation); } } - LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); + log.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); updateMinReplicaCount(); @@ -133,7 +131,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getPeerInfo(followerId), -1, context); followerToLog.put(followerId, followerLogInformation); - if(heartbeatSchedule == null) { + if (heartbeatSchedule == null) { scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } } @@ -144,8 +142,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public void updateMinReplicaCount() { int numVoting = 0; - for(PeerInfo peer: context.getPeers()) { - if(peer.isVoting()) { + for (PeerInfo peer: context.getPeers()) { + if (peer.isVoting()) { numVoting++; } } @@ -153,7 +151,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { minReplicationCount = getMajorityVoteCount(numVoting); } - protected int getMinIsolatedLeaderPeerCount(){ + protected int getMinIsolatedLeaderPeerCount() { //the isolated Leader peer count will be 1 less than the majority vote count. //this is because the vote count has the self vote counted in it //for e.g @@ -161,12 +159,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 - return minReplicationCount > 0 ? (minReplicationCount - 1) : 0; + return minReplicationCount > 0 ? minReplicationCount - 1 : 0; } @VisibleForTesting void setSnapshot(@Nullable Snapshot snapshot) { - if(snapshot != null) { + if (snapshot != null) { this.snapshot = Optional.of(new SnapshotHolder(snapshot)); } else { this.snapshot = Optional.absent(); @@ -182,30 +180,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries); + log.debug("{}: handleAppendEntries: {}", logName(), appendEntries); return this; } @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { - if(LOG.isTraceEnabled()) { - LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); - } + log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); // Update the FollowerLogInformation String followerId = appendEntriesReply.getFollowerId(); FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - if(followerLogInformation == null){ - LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); + if (followerLogInformation == null) { + log.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); return this; } - if(followerLogInformation.timeSinceLastActivity() > - context.getConfigParams().getElectionTimeOutInterval().toMillis()) { - LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + - "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", + if (followerLogInformation.timeSinceLastActivity() + > context.getConfigParams().getElectionTimeOutInterval().toMillis()) { + log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(), context.getLastApplied(), context.getCommitIndex()); } @@ -217,7 +213,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex); boolean updated = false; - if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) { + if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) { // The follower's log is actually ahead of the leader's log. Normally this doesn't happen // in raft as a node cannot become leader if it's log is behind another's. However, the // non-voting semantics deviate a bit from raft. Only voting members participate in @@ -231,9 +227,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // snapshot. It's also possible that the follower's last log index is behind the leader's. // However in this case the log terms won't match and the logs will conflict - this is handled // elsewhere. - LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot", - logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(), - context.getReplicatedLog().lastIndex()); + log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - " + + "forcing install snaphot", logName(), followerLogInformation.getId(), + appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex()); followerLogInformation.setMatchIndex(-1); followerLogInformation.setNextIndex(-1); @@ -242,8 +238,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { updated = true; } else if (appendEntriesReply.isSuccess()) { - if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 && - followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) { + if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 + && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) { // The follower's last entry is present in the leader's journal but the terms don't match so the // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot @@ -255,17 +251,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.setNextIndex(followerLastLogIndex - 1); updated = true; - LOG.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " + - "leader's {} - set the follower's next index to {}", - logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(), + log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " + + "leader's {} - set the follower's next index to {}", logName(), + followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(), followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); } else { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } } else { - LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); + log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); - if(appendEntriesReply.isForceInstallSnapshot()) { + if (appendEntriesReply.isForceInstallSnapshot()) { // Reset the followers match and next index. This is to signal that this follower has nothing // in common with this Leader and so would require a snapshot to be installed followerLogInformation.setMatchIndex(-1); @@ -273,8 +269,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Force initiate a snapshot capture initiateCaptureSnapshot(followerId); - } else if(followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 && - followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) { + } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 + && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) { // The follower's log is empty or the last entry is present in the leader's journal // and the terms match so the follower is just behind the leader's journal from // the last snapshot, if any. We'll catch up the follower quickly by starting at the @@ -288,7 +284,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.decrNextIndex(); updated = true; - LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}", + log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}", logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); } @@ -298,34 +294,36 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N (§5.3, §5.4). - if(LOG.isTraceEnabled()) { - LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", + if (log.isTraceEnabled()) { + log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); } - for (long N = context.getCommitIndex() + 1; ; N++) { + for (long index = context.getCommitIndex() + 1; ; index++) { int replicatedCount = 1; - LOG.trace("{}: checking Nth index {}", logName(), N); + log.trace("{}: checking Nth index {}", logName(), index); for (FollowerLogInformation info : followerToLog.values()) { final PeerInfo peerInfo = context.getPeerInfo(info.getId()); - if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) { + if (info.getMatchIndex() >= index && peerInfo != null && peerInfo.isVoting()) { replicatedCount++; - } else if(LOG.isTraceEnabled()) { - LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), + } else if (log.isTraceEnabled()) { + log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), info.getMatchIndex(), peerInfo); } } - if(LOG.isTraceEnabled()) { - LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount); + if (log.isTraceEnabled()) { + log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, + minReplicationCount); } if (replicatedCount >= minReplicationCount) { - ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); if (replicatedLogEntry == null) { - LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", - logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size()); + log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), index, context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().size()); break; } @@ -335,23 +333,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // reach consensus, as per §5.4.1: "once an entry from the current term is committed by // counting replicas, then all prior entries are committed indirectly". if (replicatedLogEntry.getTerm() == currentTerm()) { - LOG.trace("{}: Setting commit index to {}", logName(), N); - context.setCommitIndex(N); + log.trace("{}: Setting commit index to {}", logName(), index); + context.setCommitIndex(index); } else { - LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}", - logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); + log.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, " + + "term {} does not match the current term {}", logName(), index, + replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); } } else { - LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount); + log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount); break; } } // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", - logName(), followerId, context.getCommitIndex(), context.getLastApplied()); + if (log.isDebugEnabled()) { + log.debug( + "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied()); } applyLogToStateMachine(context.getCommitIndex()); @@ -372,10 +372,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; - if(updated && LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", - logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(), - followerLogInformation.getNextIndex()); + if (updated && log.isDebugEnabled()) { + log.debug( + "{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", + logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); } return updated; } @@ -425,7 +426,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", + log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); @@ -438,7 +439,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { beforeSendHeartbeat(); sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); - } else if(message instanceof SendInstallSnapshot) { + } else if (message instanceof SendInstallSnapshot) { // received from RaftActor setSnapshot(((SendInstallSnapshot) message).getSnapshot()); sendInstallSnapshot(); @@ -454,20 +455,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void handleInstallSnapshotReply(InstallSnapshotReply reply) { - LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); + log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); String followerId = reply.getFollowerId(); FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - if(followerLogInformation == null) { + if (followerLogInformation == null) { // This can happen during AddServer if it times out. - LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", + log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", logName(), followerId); return; } LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); if (installSnapshotState == null) { - LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", + log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", logName(), followerId); return; } @@ -477,22 +478,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) { boolean wasLastChunk = false; if (reply.isSuccess()) { - if(installSnapshotState.isLastChunk(reply.getChunkIndex())) { + if (installSnapshotState.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply - if(LOG.isDebugEnabled()) { - LOG.debug("{}: InstallSnapshotReply received, " + - "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}", - logName(), reply.getChunkIndex(), followerId, - context.getReplicatedLog().getSnapshotIndex() + 1 - ); - } + log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -" + + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId, + context.getReplicatedLog().getSnapshotIndex() + 1); long followerMatchIndex = snapshot.get().getLastIncludedIndex(); followerLogInformation.setMatchIndex(followerMatchIndex); followerLogInformation.setNextIndex(followerMatchIndex + 1); followerLogInformation.clearLeaderInstallSnapshotState(); - LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", + log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", logName(), followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); @@ -503,17 +500,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } wasLastChunk = true; - if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ + if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = new UnInitializedFollowerSnapshotReply(followerId); context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor()); - LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self"); + log.debug("Sent message UnInitializedFollowerSnapshotReply to self"); } } else { installSnapshotState.markSendStatus(true); } } else { - LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", + log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", logName(), reply.getChunkIndex()); installSnapshotState.markSendStatus(false); @@ -524,17 +521,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { purgeInMemoryLog(); } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) { ActorSelection followerActor = context.getPeerActorSelection(followerId); - if(followerActor != null) { + if (followerActor != null) { sendSnapshotChunk(followerActor, followerLogInformation); } } } else { - LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", + log.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", logName(), reply.getChunkIndex(), followerId, installSnapshotState.getChunkIndex()); - if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){ + if (reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX) { // Since the Follower did not find this index to be valid we should reset the follower snapshot // so that Installing the snapshot can resume from the beginning installSnapshotState.reset(); @@ -543,8 +540,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private boolean anyFollowersInstallingSnapshot() { - for(FollowerLogInformation info: followerToLog.values()) { - if(info.getInstallSnapshotState() != null) { + for (FollowerLogInformation info: followerToLog.values()) { + if (info.getInstallSnapshotState() != null) { return true; } @@ -556,12 +553,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), + log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); // Create a tracker entry we will use this later to notify the // client actor - if(replicate.getClientActor() != null) { + if (replicate.getClientActor() != null) { trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), logIndex)); } @@ -569,7 +566,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { boolean applyModificationToState = !context.anyVotingPeers() || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); - if(applyModificationToState){ + if (applyModificationToState) { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } @@ -585,20 +582,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final String followerId = e.getKey(); final FollowerLogInformation followerLogInformation = e.getValue(); // This checks helps not to send a repeat message to the follower - if(!followerLogInformation.isFollowerActive() || - followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { + if (!followerLogInformation.isFollowerActive() + || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat); } } } /** - * * This method checks if any update needs to be sent to the given follower. This includes append log entries, * sending next snapshot chunk, and initiating a snapshot. + * * @return true if any update is sent, false otherwise */ - private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, boolean sendHeartbeat, boolean isHeartbeat) { @@ -614,7 +610,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // if install snapshot is in process , then sent next chunk if possible if (isFollowerActive && installSnapshotState.canSendNextChunk()) { sendSnapshotChunk(followerActor, followerLogInformation); - } else if(sendHeartbeat) { + } else if (sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntries = true; } @@ -622,17 +618,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) { - LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", - logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); + if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) { + log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, " + + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive, + followerNextIndex, leaderLastIndex, leaderSnapShotIndex); } if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { - LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), + log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), followerNextIndex, followerId); - if(followerLogInformation.okToReplicate()) { + if (followerLogInformation.okToReplicate()) { // Try to send all the entries in the journal but not exceeding the max data size // for a single AppendEntries message. int maxEntries = (int) context.getReplicatedLog().size(); @@ -640,16 +637,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getConfigParams().getSnapshotChunkSize()); sendAppendEntries = true; } - } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { + } else if (isFollowerActive && followerNextIndex >= 0 + && leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " + - "follower-nextIndex: %d, leader-snapshot-index: %d, " + - "leader-last-index: %d", logName(), followerId, + if (log.isDebugEnabled()) { + log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " + + "follower-nextIndex: %d, leader-snapshot-index: %d, " + + "leader-last-index: %d", logName(), followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); } @@ -659,7 +656,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { initiateCaptureSnapshot(followerId); } - } else if(sendHeartbeat) { + } else if (sendHeartbeat) { // we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again sendAppendEntries = true; @@ -667,7 +664,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } - if(sendAppendEntries) { + if (sendAppendEntries) { sendAppendEntriesToFollower(followerActor, entries, followerLogInformation); } } @@ -692,8 +689,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { getLogEntryTerm(followerNextIndex - 1), entries, leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion()); - if(!entries.isEmpty() || LOG.isTraceEnabled()) { - LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(), + if (!entries.isEmpty() || log.isTraceEnabled()) { + log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(), appendEntries); } @@ -702,7 +699,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { /** * Initiates a snapshot capture to install on a follower. - * + *

    * Install Snapshot works as follows * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor. * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to @@ -731,7 +728,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), this.getReplicatedToAllIndex(), followerId); - if(captureInitiated) { + if (captureInitiated) { followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( context.getConfigParams().getSnapshotChunkSize(), logName())); } @@ -740,19 +737,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - private boolean canInstallSnapshot(long nextIndex){ + private boolean canInstallSnapshot(long nextIndex) { // If the follower's nextIndex is -1 then we might as well send it a snapshot // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present // in the snapshot - return nextIndex == -1 || - (!context.getReplicatedLog().isPresent(nextIndex) - && context.getReplicatedLog().isInSnapshot(nextIndex)); + return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex) + && context.getReplicatedLog().isInSnapshot(nextIndex); } private void sendInstallSnapshot() { - LOG.debug("{}: sendInstallSnapshot", logName()); + log.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { String followerId = e.getKey(); ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -760,9 +756,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (followerActor != null) { long nextIndex = followerLogInfo.getNextIndex(); - if (followerLogInfo.getInstallSnapshotState() != null || - context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED || - canInstallSnapshot(nextIndex)) { + if (followerLogInfo.getInstallSnapshotState() != null + || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED + || canInstallSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, followerLogInfo); } } @@ -787,12 +783,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); - LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), + log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), nextSnapshotChunk.length); int nextChunkIndex = installSnapshotState.incrementChunkIndex(); Optional serverConfig = Optional.absent(); - if(installSnapshotState.isLastChunk(nextChunkIndex)) { + if (installSnapshotState.isLastChunk(nextChunkIndex)) { serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); } @@ -809,17 +805,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { actor() ); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - logName(), followerActor.path(), installSnapshotState.getChunkIndex(), - installSnapshotState.getTotalChunks()); - } + log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), + installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); } } private void sendHeartBeat() { if (!followerToLog.isEmpty()) { - LOG.trace("{}: Sending heartbeat", logName()); + log.trace("{}: Sending heartbeat", logName()); sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true); } } @@ -868,7 +861,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId()); - if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { + if (peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { --minPresent; if (minPresent == 0) { return false; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 375b3779b8..5c5c520761 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -28,41 +28,43 @@ import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; /** - * Abstract class that represents the behavior of a RaftActor - *

    - * All Servers: - *

      - *
    • If commitIndex > lastApplied: increment lastApplied, apply - * log[lastApplied] to state machine (§5.3) - *
    • If RPC request or response contains term T > currentTerm: - * set currentTerm = T, convert to follower (§5.1) + * Abstract class that provides common code for a RaftActor behavior. */ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { /** - * Information about the RaftActor whose behavior this class represents + * Information about the RaftActor whose behavior this class represents. */ protected final RaftActorContext context; /** - * + * Used for message logging. */ - protected final Logger LOG; + protected final Logger log; /** - * + * Prepended to log messages to provide appropriate context. */ - private Cancellable electionCancel = null; - - private long replicatedToAllIndex = -1; - private final String logName; + /** + * The RaftState corresponding to his behavior. + */ private final RaftState state; + /** + * Used to cancel a scheduled election. + */ + private Cancellable electionCancel = null; + + /** + * The index of the last log entry that has been replicated to all raft peers. + */ + private long replicatedToAllIndex = -1; + AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) { this.context = Preconditions.checkNotNull(context); this.state = Preconditions.checkNotNull(state); - this.LOG = context.getLogger(); + this.log = context.getLogger(); logName = String.format("%s (%s)", context.getId(), state); } @@ -118,29 +120,22 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries); - /** - * appendEntries first processes the AppendEntries message and then - * delegates handling to a specific behavior + * Handles the common logic for the AppendEntries message and delegates handling to the derived class. * - * @param sender - * @param appendEntries + * @param sender the ActorRef that sent the message + * @param appendEntries the message * @return a new behavior if it was changed or the current behavior */ - protected RaftActorBehavior appendEntries(ActorRef sender, - AppendEntries appendEntries) { + protected RaftActorBehavior appendEntries(ActorRef sender, AppendEntries appendEntries) { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Cannot append entries because sender term {} is less than {}", - logName(), appendEntries.getTerm(), currentTerm()); - } + log.debug("{}: Cannot append entries because sender term {} is less than {}", logName(), + appendEntries.getTerm(), currentTerm()); - sender.tell( - new AppendEntriesReply(context.getId(), currentTerm(), false, - lastIndex(), lastTerm(), context.getPayloadVersion()), actor() - ); + sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(), + context.getPayloadVersion()), actor()); return this; } @@ -164,33 +159,32 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { AppendEntriesReply appendEntriesReply); /** - * requestVote handles the RequestVote message. This logic is common - * for all behaviors + * Handles the logic for the RequestVote message that is common for all behaviors. * - * @param sender - * @param requestVote + * @param sender the ActorRef that sent the message + * @param requestVote the message * @return a new behavior if it was changed or the current behavior */ protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) { - LOG.debug("{}: In requestVote: {}", logName(), requestVote); + log.debug("{}: In requestVote: {}", logName(), requestVote); boolean grantVote = canGrantVote(requestVote); - if(grantVote) { + if (grantVote) { context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId()); } RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote); - LOG.debug("{}: requestVote returning: {}", logName(), reply); + log.debug("{}: requestVote returning: {}", logName(), reply); sender.tell(reply, actor()); return this; } - protected boolean canGrantVote(RequestVote requestVote){ + protected boolean canGrantVote(RequestVote requestVote) { boolean grantVote = false; // Reply false if term < currentTerm (§5.1) @@ -213,7 +207,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // more up-to-date. if (requestVote.getLastLogTerm() > lastTerm()) { candidateLatest = true; - } else if ((requestVote.getLastLogTerm() == lastTerm()) + } else if (requestVote.getLastLogTerm() == lastTerm() && requestVote.getLastLogIndex() >= lastIndex()) { candidateLatest = true; } @@ -241,6 +235,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { RequestVoteReply requestVoteReply); /** + * Returns a duration for election with an additional variance for randomness. * * @return a random election duration */ @@ -251,7 +246,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** - * stop the scheduled election + * Stops the currently scheduled election. */ protected void stopElection() { if (electionCancel != null && !electionCancel.isCancelled()) { @@ -264,7 +259,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** - * schedule a new election + * Schedule a new election. * * @param interval the duration after which we should trigger a new election */ @@ -277,6 +272,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** + * Returns the current election term. + * * @return the current term */ protected long currentTerm() { @@ -284,6 +281,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** + * Returns the id of the candidate that this server voted for in current term. + * * @return the candidate for whom we voted in the current term */ protected String votedFor() { @@ -291,46 +290,53 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** - * @return the actor associated with this behavior + * Returns the actor associated with this behavior. + * + * @return the actor */ protected ActorRef actor() { return context.getActor(); } /** + * Returns the term of the last entry in the log. * - * @return the term from the last entry in the log + * @return the term */ protected long lastTerm() { return context.getReplicatedLog().lastTerm(); } /** - * @return the index from the last entry in the log + * Returns the index of the last entry in the log. + * + * @return the index */ protected long lastIndex() { return context.getReplicatedLog().lastIndex(); } /** - * @param logIndex - * @return the client request tracker for the specified logIndex + * Removes and returns the ClientRequestTracker for the specified log index. + * @param logIndex the log index + * @return the ClientRequestTracker or null if none available */ protected ClientRequestTracker removeClientRequestTracker(long logIndex) { return null; } /** + * Returns the actual index of the entry in replicated log for the given index or -1 if not found. * - * @return the log entry index for the given index or -1 if not found + * @return the log entry index or -1 if not found */ - protected long getLogEntryIndex(long index){ - if(index == context.getReplicatedLog().getSnapshotIndex()){ + protected long getLogEntryIndex(long index) { + if (index == context.getReplicatedLog().getSnapshotIndex()) { return context.getReplicatedLog().getSnapshotIndex(); } ReplicatedLogEntry entry = context.getReplicatedLog().get(index); - if(entry != null){ + if (entry != null) { return entry.getIndex(); } @@ -338,15 +344,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** - * @return the log entry term for the given index or -1 if not found + * Returns the actual term of the entry in replicated log for the given index or -1 if not found. + * + * @return the log entry term or -1 if not found */ - protected long getLogEntryTerm(long index){ - if(index == context.getReplicatedLog().getSnapshotIndex()){ + protected long getLogEntryTerm(long index) { + if (index == context.getReplicatedLog().getSnapshotIndex()) { return context.getReplicatedLog().getSnapshotTerm(); } ReplicatedLogEntry entry = context.getReplicatedLog().get(index); - if(entry != null){ + if (entry != null) { return entry.getTerm(); } @@ -354,9 +362,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** - * Apply the provided index to the state machine + * Applies the log entries up to the specified index that is known to be committed to the state machine. * - * @param index a log index that is known to be committed + * @param index the log index */ protected void applyLogToStateMachine(final long index) { long newLastApplied = context.getLastApplied(); @@ -381,14 +389,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } else { //if one index is not present in the log, no point in looping // around as the rest wont be present either - LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", + log.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", logName(), i, i, index); break; } } - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied); - } + + log.debug("{}: Setting last applied to {}", logName(), newLastApplied); + context.setLastApplied(newLastApplied); // send a message to persist a ApplyLogEntries marker message into akka's persistent journal @@ -422,16 +430,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return internalSwitchBehavior(createBehavior(context, newState)); } + @SuppressWarnings("checkstyle:IllegalCatch") protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) { - if(!context.getRaftPolicy().automaticElectionsEnabled()) { + if (!context.getRaftPolicy().automaticElectionsEnabled()) { return this; } - LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state()); + log.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state()); try { close(); - } catch (Exception e) { - LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e); + } catch (RuntimeException e) { + log.error("{}: Failed to close behavior : {}", logName(), this.state(), e); } return newBehavior; } @@ -462,20 +471,20 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { /** - * Performs a snapshot with no capture on the replicated log. - * It clears the log from the supplied index or last-applied-1 which ever is minimum. + * Performs a snapshot with no capture on the replicated log. It clears the log from the supplied index or + * lastApplied-1 which ever is minimum. * - * @param snapshotCapturedIndex + * @param snapshotCapturedIndex the index from which to clear */ protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex); - if(actualIndex != -1){ + if (actualIndex != -1) { setReplicatedToAllIndex(actualIndex); } } - protected String getId(){ + protected String getId() { return context.getId(); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 176704f3d3..b86d28b5f7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -23,7 +23,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; /** - * The behavior of a RaftActor when it is in the CandidateState + * The behavior of a RaftActor when it is in the Candidate raft state. *

      * Candidates (§5.2): *

        @@ -51,21 +51,19 @@ public class Candidate extends AbstractRaftActorBehavior { public Candidate(RaftActorContext context) { super(context, RaftState.Candidate); - for(PeerInfo peer: context.getPeers()) { - if(peer.isVoting()) { + for (PeerInfo peer: context.getPeers()) { + if (peer.isVoting()) { votingPeers.add(peer.getId()); } } - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers); - } + log.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers); votesRequired = getMajorityVoteCount(votingPeers.size()); startNewTerm(); - if(votingPeers.isEmpty()){ + if (votingPeers.isEmpty()) { actor().tell(ElectionTimeout.INSTANCE, actor()); } else { scheduleElection(electionDuration()); @@ -83,16 +81,13 @@ public class Candidate extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleAppendEntries(ActorRef sender, - AppendEntries appendEntries) { + protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries); - } + log.debug("{}: handleAppendEntries: {}", logName(), appendEntries); // Some other candidate for the same term became a leader and sent us an append entry - if(currentTerm() == appendEntries.getTerm()){ - LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower", + if (currentTerm() == appendEntries.getTerm()) { + log.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower", logName(), currentTerm()); return switchBehavior(new Follower(context)); @@ -108,15 +103,15 @@ public class Candidate extends AbstractRaftActorBehavior { @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { - LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount); + log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount); if (requestVoteReply.isVoteGranted()) { voteCount++; } if (voteCount >= votesRequired) { - if(context.getLastApplied() < context.getReplicatedLog().lastIndex()) { - LOG.debug("{}: LastApplied index {} is behind last index {}", logName(), context.getLastApplied(), + if (context.getLastApplied() < context.getReplicatedLog().lastIndex()) { + log.debug("{}: LastApplied index {} is behind last index {}", logName(), context.getLastApplied(), context.getReplicatedLog().lastIndex()); return internalSwitchBehavior(RaftState.PreLeader); } else { @@ -130,7 +125,7 @@ public class Candidate extends AbstractRaftActorBehavior { @Override public RaftActorBehavior handleMessage(ActorRef sender, Object message) { if (message instanceof ElectionTimeout) { - LOG.debug("{}: Received ElectionTimeout", logName()); + log.debug("{}: Received ElectionTimeout", logName()); if (votesRequired == 0) { // If there are no peers then we should be a Leader @@ -151,10 +146,8 @@ public class Candidate extends AbstractRaftActorBehavior { RaftRPC rpc = (RaftRPC) message; - if(LOG.isDebugEnabled()) { - LOG.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc, + log.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc, context.getTermInformation().getCurrentTerm()); - } // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) @@ -187,21 +180,21 @@ public class Candidate extends AbstractRaftActorBehavior { long newTerm = currentTerm + 1; context.getTermInformation().updateAndPersist(newTerm, context.getId()); - LOG.debug("{}: Starting new term {}", logName(), newTerm); + log.debug("{}: Starting new term {}", logName(), newTerm); // Request for a vote // TODO: Retry request for vote if replies do not arrive in a reasonable // amount of time TBD for (String peerId : votingPeers) { ActorSelection peerActor = context.getPeerActorSelection(peerId); - if(peerActor != null) { + if (peerActor != null) { RequestVote requestVote = new RequestVote( context.getTermInformation().getCurrentTerm(), context.getId(), context.getReplicatedLog().lastIndex(), context.getReplicatedLog().lastTerm()); - LOG.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId); + log.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId); peerActor.tell(requestVote, context.getActor()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index cb141f9f1e..3f68b50a4f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -40,7 +40,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; /** - * The behavior of a RaftActor in the Follower state + * The behavior of a RaftActor in the Follower raft state. *

        *

          *
        • Respond to RPCs from candidates and leaders @@ -57,7 +57,7 @@ public class Follower extends AbstractRaftActorBehavior { private final SyncStatusTracker initialSyncStatusTracker; private final Procedure appendAndPersistCallback = - logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry); + logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry); private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted(); private SnapshotTracker snapshotTracker = null; @@ -110,8 +110,8 @@ public class Follower extends AbstractRaftActorBehavior { lastLeaderMessageTimer.start(); } - private boolean isLogEntryPresent(long index){ - if(context.getReplicatedLog().isInSnapshot(index)) { + private boolean isLogEntryPresent(long index) { + if (context.getReplicatedLog().isInSnapshot(index)) { return true; } @@ -120,18 +120,18 @@ public class Follower extends AbstractRaftActorBehavior { } - private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){ - initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex()); + private void updateInitialSyncStatus(long currentLeaderCommit, String newLeaderId) { + initialSyncStatusTracker.update(newLeaderId, currentLeaderCommit, context.getCommitIndex()); } @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0; - if(LOG.isTraceEnabled()) { - LOG.trace("{}: handleAppendEntries: {}", logName(), appendEntries); - } else if(LOG.isDebugEnabled() && numLogEntries > 0) { - LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries); + if (log.isTraceEnabled()) { + log.trace("{}: handleAppendEntries: {}", logName(), appendEntries); + } else if (log.isDebugEnabled() && numLogEntries > 0) { + log.debug("{}: handleAppendEntries: {}", logName(), appendEntries); } // TODO : Refactor this method into a bunch of smaller methods @@ -139,8 +139,8 @@ public class Follower extends AbstractRaftActorBehavior { // cover the code properly if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) { - LOG.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the " + - "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId()); + log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the " + + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId()); snapshotTracker = null; } @@ -149,9 +149,7 @@ public class Follower extends AbstractRaftActorBehavior { AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, lastIndex(), lastTerm(), context.getPayloadVersion()); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply); - } + log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply); sender.tell(reply, actor()); return this; @@ -169,7 +167,7 @@ public class Follower extends AbstractRaftActorBehavior { // We found that the log was out of sync so just send a negative // reply and return - LOG.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}", + log.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}", logName(), lastIndex, lastTerm()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, @@ -179,7 +177,7 @@ public class Follower extends AbstractRaftActorBehavior { if (appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { - LOG.debug("{}: Number of entries to be appended = {}", logName(), + log.debug("{}: Number of entries to be appended = {}", logName(), appendEntries.getEntries().size()); // 3. If an existing entry conflicts with a new one (same index @@ -192,35 +190,35 @@ public class Follower extends AbstractRaftActorBehavior { for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i); - if(!isLogEntryPresent(matchEntry.getIndex())) { + if (!isLogEntryPresent(matchEntry.getIndex())) { // newEntry not found in the log break; } long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex()); - LOG.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry, + log.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry, existingEntryTerm); // existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know // what the term was so we'll assume it matches. - if(existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) { + if (existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) { continue; } - if(!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) { + if (!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) { - LOG.debug("{}: Removing entries from log starting at {}", logName(), + log.debug("{}: Removing entries from log starting at {}", logName(), matchEntry.getIndex()); // Entries do not match so remove all subsequent entries - if(!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) { + if (!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) { // Could not remove the entries - this means the matchEntry index must be in the // snapshot and not the log. In this case the prior entries are part of the state // so we must send back a reply to force a snapshot to completely re-sync the // follower's log and state. - LOG.debug("{}: Could not remove entries - sending reply to force snapshot", logName()); + log.debug("{}: Could not remove entries - sending reply to force snapshot", logName()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, lastTerm(), context.getPayloadVersion(), true), actor()); return this; @@ -236,23 +234,23 @@ public class Follower extends AbstractRaftActorBehavior { } lastIndex = lastIndex(); - LOG.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(), + log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(), lastIndex, addEntriesFrom); // 4. Append any new entries not already in the log for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { ReplicatedLogEntry entry = appendEntries.getEntries().get(i); - LOG.debug("{}: Append entry to log {}", logName(), entry.getData()); + log.debug("{}: Append entry to log {}", logName(), entry.getData()); context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback); - if(entry.getData() instanceof ServerConfigurationPayload) { + if (entry.getData() instanceof ServerConfigurationPayload) { context.updatePeerIds((ServerConfigurationPayload)entry.getData()); } } - LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size()); + log.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size()); } // 5. If leaderCommit > commitIndex, set commitIndex = @@ -261,23 +259,22 @@ public class Follower extends AbstractRaftActorBehavior { lastIndex = lastIndex(); long prevCommitIndex = context.getCommitIndex(); - if(appendEntries.getLeaderCommit() > prevCommitIndex) { + if (appendEntries.getLeaderCommit() > prevCommitIndex) { context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex)); } if (prevCommitIndex != context.getCommitIndex()) { - LOG.debug("{}: Commit index set to {}", logName(), context.getCommitIndex()); + log.debug("{}: Commit index set to {}", logName(), context.getCommitIndex()); } // If commitIndex > lastApplied: increment lastApplied, apply // log[lastApplied] to state machine (§5.3) // check if there are any entries to be applied. last-applied can be equal to last-index - if (appendEntries.getLeaderCommit() > context.getLastApplied() && - context.getLastApplied() < lastIndex) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: applyLogToStateMachine, " + - "appendEntries.getLeaderCommit(): {}," + - "context.getLastApplied(): {}, lastIndex(): {}", logName(), + if (appendEntries.getLeaderCommit() > context.getLastApplied() + && context.getLastApplied() < lastIndex) { + if (log.isDebugEnabled()) { + log.debug("{}: applyLogToStateMachine, appendEntries.getLeaderCommit(): {}," + + "context.getLastApplied(): {}, lastIndex(): {}", logName(), appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex); } @@ -287,10 +284,10 @@ public class Follower extends AbstractRaftActorBehavior { AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, lastIndex, lastTerm(), context.getPayloadVersion()); - if(LOG.isTraceEnabled()) { - LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply); - } else if(LOG.isDebugEnabled() && numLogEntries > 0) { - LOG.debug("{}: handleAppendEntries returning : {}", logName(), reply); + if (log.isTraceEnabled()) { + log.trace("{}: handleAppendEntries returning : {}", logName(), reply); + } else if (log.isDebugEnabled() && numLogEntries > 0) { + log.debug("{}: handleAppendEntries returning : {}", logName(), reply); } sender.tell(reply, actor()); @@ -316,39 +313,39 @@ public class Follower extends AbstractRaftActorBehavior { // an entry at prevLogIndex and this follower has no entries in // it's log. - LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}", + log.debug("{}: The followers log is empty and the senders prevLogIndex is {}", logName(), appendEntries.getPrevLogIndex()); } else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) { // The follower's log is out of sync because the Leader's // prevLogIndex entry was not found in it's log - LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, snapshotIndex: {}", - logName(), appendEntries.getPrevLogIndex(), lastIndex, context.getReplicatedLog().getSnapshotIndex()); + log.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - " + + "lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex, + context.getReplicatedLog().getSnapshotIndex()); } else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) { // The follower's log is out of sync because the Leader's // prevLogIndex entry does exist in the follower's log but it has // a different term in it - LOG.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries " + - "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), - prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex()); - } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 + log.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries" + + "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), + prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex, + context.getReplicatedLog().getSnapshotIndex()); + } else if (appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 && appendEntries.getReplicatedToAllIndex() != -1 && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) { // This append entry comes from a leader who has it's log aggressively trimmed and so does not have // the previous entry in it's in-memory journal - LOG.debug( - "{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal", - logName(), appendEntries.getReplicatedToAllIndex()); - } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 + log.debug("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the" + + " in-memory journal", logName(), appendEntries.getReplicatedToAllIndex()); + } else if (appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) { - LOG.debug( - "{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal", - logName(), appendEntries.getEntries().get(0).getIndex() - 1); + log.debug("{}: Cannot append entries because the calculated previousIndex {} was not found in the " + + " in-memory journal", logName(), appendEntries.getEntries().get(0).getIndex() - 1); } else { outOfSync = false; } @@ -383,7 +380,7 @@ public class Follower extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term", + log.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); @@ -412,33 +409,33 @@ public class Follower extends AbstractRaftActorBehavior { // lastLeaderMessageTimer. long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS); long electionTimeoutInMillis = context.getConfigParams().getElectionTimeOutInterval().toMillis(); - boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || - lastLeaderMessageInterval >= electionTimeoutInMillis; + boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() + || lastLeaderMessageInterval >= electionTimeoutInMillis; - if(canStartElection()) { - if(message instanceof TimeoutNow) { - LOG.debug("{}: Received TimeoutNow - switching to Candidate", logName()); + if (canStartElection()) { + if (message instanceof TimeoutNow) { + log.debug("{}: Received TimeoutNow - switching to Candidate", logName()); return internalSwitchBehavior(RaftState.Candidate); - } else if(noLeaderMessageReceived) { + } else if (noLeaderMessageReceived) { // Check the cluster state to see if the leader is known to be up before we go to Candidate. // However if we haven't heard from the leader in a long time even though the cluster state // indicates it's up then something is wrong - leader might be stuck indefinitely - so switch // to Candidate, long maxElectionTimeout = electionTimeoutInMillis * MAX_ELECTION_TIMEOUT_FACTOR; - if(isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) { - LOG.debug("{}: Received ElectionTimeout but leader appears to be available", logName()); + if (isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) { + log.debug("{}: Received ElectionTimeout but leader appears to be available", logName()); scheduleElection(electionDuration()); } else { - LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName()); + log.debug("{}: Received ElectionTimeout - switching to Candidate", logName()); return internalSwitchBehavior(RaftState.Candidate); } } else { - LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}", + log.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}", logName(), lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval()); scheduleElection(electionDuration()); } - } else if(message instanceof ElectionTimeout) { - if(noLeaderMessageReceived) { + } else if (message instanceof ElectionTimeout) { + if (noLeaderMessageReceived) { setLeaderId(null); } @@ -449,17 +446,17 @@ public class Follower extends AbstractRaftActorBehavior { } private boolean isLeaderAvailabilityKnown() { - if(leaderId == null) { + if (leaderId == null) { return false; } Optional cluster = context.getCluster(); - if(!cluster.isPresent()) { + if (!cluster.isPresent()) { return false; } ActorSelection leaderActor = context.getPeerActorSelection(leaderId); - if(leaderActor == null) { + if (leaderActor == null) { return false; } @@ -468,43 +465,43 @@ public class Follower extends AbstractRaftActorBehavior { CurrentClusterState state = cluster.get().state(); Set unreachable = state.getUnreachable(); - LOG.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress, + log.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress, unreachable); - for(Member m: unreachable) { - if(leaderAddress.equals(m.address())) { - LOG.info("{}: Leader {} is unreachable", logName(), leaderAddress); + for (Member m: unreachable) { + if (leaderAddress.equals(m.address())) { + log.info("{}: Leader {} is unreachable", logName(), leaderAddress); return false; } } - for(Member m: state.getMembers()) { - if(leaderAddress.equals(m.address())) { - if(m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) { - LOG.debug("{}: Leader {} cluster status is {} - leader is available", logName(), + for (Member m: state.getMembers()) { + if (leaderAddress.equals(m.address())) { + if (m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) { + log.debug("{}: Leader {} cluster status is {} - leader is available", logName(), leaderAddress, m.status()); return true; } else { - LOG.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(), + log.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(), leaderAddress, m.status()); return false; } } } - LOG.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress); + log.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress); return false; } private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) { - LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot); + log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot); leaderId = installSnapshot.getLeaderId(); - if(snapshotTracker == null){ - snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId()); + if (snapshotTracker == null) { + snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId()); } updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId()); @@ -513,8 +510,8 @@ public class Follower extends AbstractRaftActorBehavior { final InstallSnapshotReply reply = new InstallSnapshotReply( currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true); - if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), - installSnapshot.getLastChunkHashCode())){ + if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), + installSnapshot.getLastChunkHashCode())) { Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(), new ArrayList<>(), installSnapshot.getLastIncludedIndex(), @@ -528,7 +525,7 @@ public class Follower extends AbstractRaftActorBehavior { ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { @Override public void onSuccess() { - LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply); + log.debug("{}: handleInstallSnapshot returning: {}", logName(), reply); sender.tell(reply, actor()); } @@ -543,24 +540,17 @@ public class Follower extends AbstractRaftActorBehavior { snapshotTracker = null; } else { - LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply); + log.debug("{}: handleInstallSnapshot returning: {}", logName(), reply); sender.tell(reply, actor()); } } catch (SnapshotTracker.InvalidChunkException e) { - LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e); + log.debug("{}: Exception in InstallSnapshot of follower", logName(), e); sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor()); snapshotTracker = null; - } catch (Exception e){ - LOG.error("{}: Exception in InstallSnapshot of follower", logName(), e); - - //send reply with success as false. The chunk will be sent again on failure - sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), - installSnapshot.getChunkIndex(), false), actor()); - } } @@ -570,7 +560,7 @@ public class Follower extends AbstractRaftActorBehavior { } @VisibleForTesting - SnapshotTracker getSnapshotTracker(){ + SnapshotTracker getSnapshotTracker() { return snapshotTracker; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java index a02e40092b..658c129e2d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java @@ -44,7 +44,7 @@ public class IsolatedLeader extends AbstractLeader { // it can happen that this isolated leader interacts with a new leader in the cluster and // changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) { - LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId()); + log.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId()); return internalSwitchBehavior(new Leader(context, this)); } return ret; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 827364c29f..4821d98835 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -23,7 +23,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; /** - * The behavior of a RaftActor when it is in the Leader state + * The behavior of a RaftActor when it is in the Leader state. *

          * Leaders: *

            @@ -34,15 +34,14 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; * respond after entry applied to state machine (§5.3) *
          • If last log index ≥ nextIndex for a follower: send * AppendEntries RPC with log entries starting at nextIndex - *
              *
            • If successful: update nextIndex and matchIndex for * follower (§5.3) *
            • If AppendEntries fails because of log inconsistency: * decrement nextIndex and retry (§5.3) - *
            *
          • If there exists an N such that N > commitIndex, a majority * of matchIndex[i] ≥ N, and log[N].term == currentTerm: * set commitIndex = N (§5.3, §5.4). + *
          */ public class Leader extends AbstractLeader { /** @@ -53,7 +52,7 @@ public class Leader extends AbstractLeader { static final Object ISOLATED_LEADER_CHECK = new Object(); private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted(); - private @Nullable LeadershipTransferContext leadershipTransferContext; + @Nullable private LeadershipTransferContext leadershipTransferContext; Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) { super(context, RaftState.Leader, initializeFromLeader); @@ -69,7 +68,7 @@ public class Leader extends AbstractLeader { if (ISOLATED_LEADER_CHECK.equals(originalMessage)) { if (isLeaderIsolated()) { - LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", + log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId()); return internalSwitchBehavior(new IsolatedLeader(context, this)); } else { @@ -81,15 +80,16 @@ public class Leader extends AbstractLeader { } @Override - protected void beforeSendHeartbeat(){ - if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){ + protected void beforeSendHeartbeat() { + if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) + > context.getConfigParams().getIsolatedCheckIntervalInMillis()) { context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor()); isolatedLeaderCheck.reset().start(); } - if(leadershipTransferContext != null && leadershipTransferContext.isExpired( + if (leadershipTransferContext != null && leadershipTransferContext.isExpired( context.getConfigParams().getElectionTimeOutInterval().toMillis())) { - LOG.debug("{}: Leadership transfer expired", logName()); + log.debug("{}: Leadership transfer expired", logName()); leadershipTransferContext = null; } } @@ -117,10 +117,10 @@ public class Leader extends AbstractLeader { * {@link RaftActorLeadershipTransferCohort#abortTtransfer}.
        • *
        * - * @param leadershipTransferCohort + * @param leadershipTransferCohort the cohort participating in the leadership transfer */ public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) { - LOG.debug("{}: Attempting to transfer leadership", logName()); + log.debug("{}: Attempting to transfer leadership", logName()); leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort); @@ -129,23 +129,23 @@ public class Leader extends AbstractLeader { } private void tryToCompleteLeadershipTransfer(String followerId) { - if(leadershipTransferContext == null) { + if (leadershipTransferContext == null) { return; } FollowerLogInformation followerInfo = getFollower(followerId); - if(followerInfo == null) { + if (followerInfo == null) { return; } long lastIndex = context.getReplicatedLog().lastIndex(); boolean isVoting = context.getPeerInfo(followerId).isVoting(); - LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}", + log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}", logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting); - if(isVoting && followerInfo.getMatchIndex() == lastIndex) { - LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName()); + if (isVoting && followerInfo.getMatchIndex() == lastIndex) { + log.debug("{}: Follower's log matches - sending ElectionTimeout", logName()); // We can't be sure if the follower has applied all its log entries to its state so send an // additional AppendEntries with the latest commit index. @@ -155,7 +155,7 @@ public class Leader extends AbstractLeader { ActorSelection followerActor = context.getPeerActorSelection(followerId); followerActor.tell(TimeoutNow.INSTANCE, context.getActor()); - LOG.debug("{}: Leader transfer complete", logName()); + log.debug("{}: Leader transfer complete", logName()); leadershipTransferContext.transferCohort.transferComplete(); leadershipTransferContext = null; @@ -164,7 +164,7 @@ public class Leader extends AbstractLeader { @Override public void close() { - if(leadershipTransferContext != null) { + if (leadershipTransferContext != null) { LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext; leadershipTransferContext = null; localLeadershipTransferContext.transferCohort.abortTransfer(); @@ -192,7 +192,7 @@ public class Leader extends AbstractLeader { } boolean isExpired(long timeout) { - if(timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) { + if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) { transferCohort.abortTransfer(); return true; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java index eab3f7d7da..09f5186a01 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java @@ -22,7 +22,7 @@ import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; * the log with the leader's current term. Once the no-op entry is committed, all prior entries are committed * indirectly. Once all entries are committed, ie commitIndex matches the last log index, it switches to the * normal Leader state. - *

        + *

        * The use of a no-op entry in this manner is outlined in the last paragraph in §8 of the * extended raft version. * @@ -39,7 +39,7 @@ public class PreLeader extends AbstractLeader { @Override public RaftActorBehavior handleMessage(ActorRef sender, Object message) { if (message instanceof ApplyState) { - if(context.getLastApplied() >= context.getReplicatedLog().lastIndex()) { + if (context.getLastApplied() >= context.getReplicatedLog().lastIndex()) { // We've applied all entries - we can switch to Leader. return internalSwitchBehavior(new Leader(context, this)); } else { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java index bf5116935d..6b51c34e88 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java @@ -13,19 +13,10 @@ import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftState; /** - * A RaftActorBehavior represents the specific behavior of a RaftActor - *

        - * A RaftActor can behave as one of the following, - *

          - *
        • Follower
        • - *
        • Candidate
        • - *
        • Leader
        • - *
        - *

        - * In each of these behaviors the Raft Actor handles the same Raft messages - * differently. + * The interface for a class that implements a specific behavior of a RaftActor. The types of behaviors are enumerated + * by {@link RaftState}. Each handles the same Raft messages differently. */ -public interface RaftActorBehavior extends AutoCloseable{ +public interface RaftActorBehavior extends AutoCloseable { /** * Handle a message. If the processing of the message warrants a state @@ -37,42 +28,50 @@ public interface RaftActorBehavior extends AutoCloseable{ * * @return The new behavior or current behavior, or null if the message was not handled. */ - @Nullable RaftActorBehavior handleMessage(ActorRef sender, Object message); + @Nullable + RaftActorBehavior handleMessage(ActorRef sender, Object message); /** + * Returns the state associated with this behavior. * - * @return The state associated with a given behavior + * @return the RaftState */ RaftState state(); /** + * Returns the id of the leader. * - * @return The Id of the Leader if known else null + * @return the id of the leader or null if not known */ + @Nullable String getLeaderId(); /** - * setting the index of the log entry which is replicated to all nodes - * @param replicatedToAllIndex + * Sets the index of the last log entry that has been replicated to all peers. + * + * @param replicatedToAllIndex the index */ void setReplicatedToAllIndex(long replicatedToAllIndex); /** - * @return the index of the log entry which is replicated to all nodes + * Returns the index of the last log entry that has been replicated to all peers. + * + * @return the index or -1 if not known */ long getReplicatedToAllIndex(); /** - * @return the leader's payload data version. + * Returns the leader's payload data version. + * + * @return a short representing the version */ short getLeaderPayloadVersion(); /** - * switchBehavior makes sure that the current behavior is shutdown before it switches to the new - * behavior + * Closes the current behavior and switches to the specified behavior, if possible. * - * @param behavior The new behavior to switch to - * @return The new behavior + * @param behavior the new behavior to switch to + * @return the new behavior */ RaftActorBehavior switchBehavior(RaftActorBehavior behavior); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java index fadca3b15d..3ba020b814 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java @@ -15,10 +15,10 @@ import java.util.Arrays; import org.slf4j.Logger; /** - * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower + * Helper class that maintains state for a snapshot that is being installed in chunks on a Follower. */ public class SnapshotTracker { - private final Logger LOG; + private final Logger log; private final int totalChunks; private final String leaderId; private ByteString collectedChunks = ByteString.EMPTY; @@ -26,37 +26,39 @@ public class SnapshotTracker { private boolean sealed = false; private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE; - SnapshotTracker(Logger LOG, int totalChunks, String leaderId) { - this.LOG = LOG; + SnapshotTracker(Logger log, int totalChunks, String leaderId) { + this.log = log; this.totalChunks = totalChunks; this.leaderId = Preconditions.checkNotNull(leaderId); } /** - * Adds a chunk to the tracker + * Adds a chunk to the tracker. * - * @param chunkIndex - * @param chunk - * @return true when the lastChunk is received - * @throws InvalidChunkException + * @param chunkIndex the index of the chunk + * @param chunk the chunk data + * @param lastChunkHashCode the optional hash code for the chunk + * @return true if this is the last chunk is received + * @throws InvalidChunkException if the chunk index is invalid or out of order */ - boolean addChunk(int chunkIndex, byte[] chunk, Optional lastChunkHashCode) throws InvalidChunkException{ - LOG.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}", + boolean addChunk(int chunkIndex, byte[] chunk, Optional maybeLastChunkHashCode) + throws InvalidChunkException { + log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}", chunkIndex, lastChunkIndex, collectedChunks.size(), this.lastChunkHashCode); - if(sealed){ - throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received"); + if (sealed) { + throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + + " all chunks already received"); } - if(lastChunkIndex + 1 != chunkIndex){ + if (lastChunkIndex + 1 != chunkIndex) { throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex); } - if(lastChunkHashCode.isPresent()){ - if(lastChunkHashCode.get() != this.lastChunkHashCode){ - throw new InvalidChunkException("The hash code of the recorded last chunk does not match " + - "the senders hash code, expected " + this.lastChunkHashCode + " was " + lastChunkHashCode.get()); - } + if (maybeLastChunkHashCode.isPresent() && maybeLastChunkHashCode.get() != this.lastChunkHashCode) { + throw new InvalidChunkException("The hash code of the recorded last chunk does not match " + + "the senders hash code, expected " + this.lastChunkHashCode + " was " + + maybeLastChunkHashCode.get()); } sealed = chunkIndex == totalChunks; @@ -66,15 +68,15 @@ public class SnapshotTracker { return sealed; } - byte[] getSnapshot(){ - if(!sealed) { + byte[] getSnapshot() { + if (!sealed) { throw new IllegalStateException("lastChunk not received yet"); } return collectedChunks.toByteArray(); } - ByteString getCollectedChunks(){ + ByteString getCollectedChunks() { return collectedChunks; } @@ -85,9 +87,8 @@ public class SnapshotTracker { public static class InvalidChunkException extends Exception { private static final long serialVersionUID = 1L; - InvalidChunkException(String message){ + InvalidChunkException(String message) { super(message); } } - } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java index 85622a5908..0986565bac 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java @@ -40,32 +40,32 @@ public class SyncStatusTracker { this.syncThreshold = syncThreshold; } - public void update(String leaderId, long leaderCommit, long commitIndex){ - leaderId = Preconditions.checkNotNull(leaderId, "leaderId should not be null"); + public void update(String leaderId, long leaderCommit, long commitIndex) { + Preconditions.checkNotNull(leaderId, "leaderId should not be null"); - if(!leaderId.equals(syncedLeaderId)){ + if (!leaderId.equals(syncedLeaderId)) { minimumExpectedIndex = leaderCommit; changeSyncStatus(NOT_IN_SYNC, FORCE_STATUS_CHANGE); syncedLeaderId = leaderId; return; } - if((leaderCommit - commitIndex) > syncThreshold){ + if (leaderCommit - commitIndex > syncThreshold) { changeSyncStatus(NOT_IN_SYNC); - } else if((leaderCommit - commitIndex) <= syncThreshold && commitIndex >= minimumExpectedIndex) { + } else if (leaderCommit - commitIndex <= syncThreshold && commitIndex >= minimumExpectedIndex) { changeSyncStatus(IN_SYNC); } } - private void changeSyncStatus(boolean newSyncStatus){ + private void changeSyncStatus(boolean newSyncStatus) { changeSyncStatus(newSyncStatus, !FORCE_STATUS_CHANGE); } - private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange){ - if(syncStatus == newSyncStatus && !forceStatusChange){ + private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange) { + if (syncStatus == newSyncStatus && !forceStatusChange) { return; } actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender()); syncStatus = newSyncStatus; } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java index 6080232538..8f23a89b8b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java @@ -15,7 +15,7 @@ import java.io.Serializable; * Request to locate the leader raft actor. Each {@link org.opendaylight.controller.cluster.raft.RaftActor} must * respond with a {@link FindLeaderReply} containing the address of the leader, as it is known to that particular * actor. - * + *

        * This message is intended for testing purposes only. */ @VisibleForTesting diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java index 549fe038ed..5fbeb92fbd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java @@ -17,7 +17,7 @@ import javax.annotation.Nullable; * Reply to {@link FindLeader} message, containing the address of the leader actor, as known to the raft actor which * sent the message. If the responding actor does not have knowledge of the leader, {@link #getLeaderActor()} will * return {@link Optional#empty()}. - * + *

        * This message is intended for testing purposes only. */ @VisibleForTesting diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java index 0bd85b1e6d..cf9bb620dd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java @@ -13,7 +13,7 @@ import java.util.List; import java.util.Map; /** - * The response to a GetOnDemandRaftState message, + * The response to a GetOnDemandRaftState message. * * @author Thomas Pantelis */ diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java index 0aabb49f6b..fc5255e271 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java @@ -14,7 +14,7 @@ public class AbstractRaftRPC implements RaftRPC { // term private long term; - protected AbstractRaftRPC(long term){ + protected AbstractRaftRPC(long term) { this.term = term; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index dc1ebf0730..487ea9b09d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -34,8 +34,7 @@ public class AppendEntries extends AbstractRaftRPC { // term of prevLogIndex entry private final long prevLogTerm; - // log entries to store (empty for heartbeat; - // may send more than one for efficiency) + // log entries to store (empty for heart beat - may send more than one for efficiency) private transient List entries; // leader's commitIndex @@ -105,6 +104,9 @@ public class AppendEntries extends AbstractRaftRPC { private AppendEntries appendEntries; + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") public Proxy() { } @@ -123,7 +125,7 @@ public class AppendEntries extends AbstractRaftRPC { out.writeShort(appendEntries.payloadVersion); out.writeInt(appendEntries.entries.size()); - for(ReplicatedLogEntry e: appendEntries.entries) { + for (ReplicatedLogEntry e: appendEntries.entries) { out.writeLong(e.getIndex()); out.writeLong(e.getTerm()); out.writeObject(e.getData()); @@ -142,7 +144,7 @@ public class AppendEntries extends AbstractRaftRPC { int size = in.readInt(); List entries = new ArrayList<>(size); - for(int i = 0; i < size; i++) { + for (int i = 0; i < size; i++) { entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java index 7784c7eec9..e2f0ba9014 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java @@ -15,7 +15,7 @@ import java.io.ObjectOutput; import org.opendaylight.controller.cluster.raft.RaftVersions; /** - * Reply for the AppendEntriesRpc message + * Reply for the AppendEntries message. */ public class AppendEntriesReply extends AbstractRaftRPC { private static final long serialVersionUID = -7487547356392536683L; @@ -110,6 +110,9 @@ public class AppendEntriesReply extends AbstractRaftRPC { private AppendEntriesReply appendEntriesReply; + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") public Proxy() { } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java index 5958cfdedf..4528dcbf5e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -15,6 +15,9 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +/** + * Message sent from a leader to install a snapshot chunk on a follower. + */ public class InstallSnapshot extends AbstractRaftRPC { private static final long serialVersionUID = 1L; @@ -28,7 +31,8 @@ public class InstallSnapshot extends AbstractRaftRPC { private final Optional serverConfig; public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, byte[] data, - int chunkIndex, int totalChunks, Optional lastChunkHashCode, Optional serverConfig) { + int chunkIndex, int totalChunks, Optional lastChunkHashCode, + Optional serverConfig) { super(term); this.leaderId = leaderId; this.lastIncludedIndex = lastIncludedIndex; @@ -100,6 +104,9 @@ public class InstallSnapshot extends AbstractRaftRPC { private InstallSnapshot installSnapshot; + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") public Proxy() { } @@ -117,12 +124,12 @@ public class InstallSnapshot extends AbstractRaftRPC { out.writeInt(installSnapshot.totalChunks); out.writeByte(installSnapshot.lastChunkHashCode.isPresent() ? 1 : 0); - if(installSnapshot.lastChunkHashCode.isPresent()) { + if (installSnapshot.lastChunkHashCode.isPresent()) { out.writeInt(installSnapshot.lastChunkHashCode.get().intValue()); } out.writeByte(installSnapshot.serverConfig.isPresent() ? 1 : 0); - if(installSnapshot.serverConfig.isPresent()) { + if (installSnapshot.serverConfig.isPresent()) { out.writeObject(installSnapshot.serverConfig.get()); } @@ -140,13 +147,13 @@ public class InstallSnapshot extends AbstractRaftRPC { Optional lastChunkHashCode = Optional.absent(); boolean chunkHashCodePresent = in.readByte() == 1; - if(chunkHashCodePresent) { + if (chunkHashCodePresent) { lastChunkHashCode = Optional.of(in.readInt()); } Optional serverConfig = Optional.absent(); boolean serverConfigPresent = in.readByte() == 1; - if(serverConfigPresent) { + if (serverConfigPresent) { serverConfig = Optional.of((ServerConfigurationPayload)in.readObject()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java index 41d2ae68c9..4f0ca1320f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java @@ -58,6 +58,9 @@ public class InstallSnapshotReply extends AbstractRaftRPC { private InstallSnapshotReply installSnapshotReply; + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") public Proxy() { } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java index ea56aeca56..bbb1222eff 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java @@ -25,8 +25,6 @@ public final class RequestVoteReply extends AbstractRaftRPC { @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("RequestVoteReply [term=").append(getTerm()).append(", voteGranted=").append(voteGranted).append("]"); - return builder.toString(); + return "RequestVoteReply [term=" + getTerm() + ", voteGranted=" + voteGranted + "]"; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerRemoved.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerRemoved.java index fba601cda6..3375137ada 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerRemoved.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerRemoved.java @@ -30,8 +30,6 @@ public class ServerRemoved implements Serializable { @Override public String toString() { - return "ServerRemoved{" + - "serverId='" + serverId + '\'' + - '}'; + return "ServerRemoved [serverId=" + serverId + "]"; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java index 4d66fa6d4d..f5c44f87e4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java @@ -8,14 +8,14 @@ package org.opendaylight.controller.cluster.raft.messages; /** - * Local message sent to self on receiving InstallSnapshotReply from a follower, this message indicates that - * the catchup of the follower is done succesfully during AddServer scenario + * Local message sent to self on receiving the InstallSnapshotReply from a follower indicating that + * the catch up of the follower has completed successfully for an AddServer operation. */ public class UnInitializedFollowerSnapshotReply { private final String followerId; - public UnInitializedFollowerSnapshotReply(String followerId){ - this.followerId = followerId; + public UnInitializedFollowerSnapshotReply(String followerId) { + this.followerId = followerId; } public String getFollowerId() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntries.java index a21c959a9a..cad23a9398 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntries.java @@ -27,6 +27,9 @@ public class ApplyJournalEntries implements Serializable, MigratedSerializable { private ApplyJournalEntries applyEntries; + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") public Proxy() { // For Externalizable } @@ -38,7 +41,7 @@ public class ApplyJournalEntries implements Serializable, MigratedSerializable { @Override public void writeExternal(final ObjectOutput out) throws IOException { out.writeLong(applyEntries.toIndex); - } + } @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntries.java index 5650360c5f..138ea0cfcf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/DeleteEntries.java @@ -24,6 +24,9 @@ public class DeleteEntries implements Serializable, MigratedSerializable { private DeleteEntries deleteEntries; + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") public Proxy() { // For Externalizable } @@ -35,7 +38,7 @@ public class DeleteEntries implements Serializable, MigratedSerializable { @Override public void writeExternal(final ObjectOutput out) throws IOException { out.writeLong(deleteEntries.fromIndex); - } + } @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java index bd9bb3e417..0f206b5d39 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java @@ -33,6 +33,9 @@ public final class ServerConfigurationPayload extends Payload implements Persist private List serverConfig; + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") public Proxy() { // For Externalizable } @@ -48,7 +51,7 @@ public final class ServerConfigurationPayload extends Payload implements Persist out.writeObject(i.getId()); out.writeBoolean(i.isVoting()); } - } + } @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTerm.java index ae86921b92..55096d677e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTerm.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/UpdateElectionTerm.java @@ -22,6 +22,9 @@ public class UpdateElectionTerm implements Serializable, MigratedSerializable { private UpdateElectionTerm updateElectionTerm; + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") public Proxy() { // For Externalizable } @@ -34,7 +37,7 @@ public class UpdateElectionTerm implements Serializable, MigratedSerializable { public void writeExternal(final ObjectOutput out) throws IOException { out.writeLong(updateElectionTerm.currentTerm); out.writeObject(updateElectionTerm.votedFor); - } + } @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java index a66caa456a..c535c2299d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java @@ -13,7 +13,7 @@ package org.opendaylight.controller.cluster.raft.policy; * we may want to be able to determine which Raft replica should become the leader - with Raft elections are * randomized so it is not possible to specify which replica should be the leader. The ability to specify * the leader would be quite useful when testing a raft cluster. - * + *

        * Similarly we may want to customize when exactly we apply a modification to the state - with Raft a modification * is only applied to the state when the modification is replicated to a majority of the replicas. The ability to * apply a modification to the state before consensus would be useful in scenarios where you have only 2 nodes @@ -31,10 +31,10 @@ public interface RaftPolicy { /** * According to Raft consensus on a Raft entry is achieved only after a Leader replicates a log entry to a - * majority of it's followers + * majority of it's followers. * * @return true if modification should be applied before consensus, false to apply modification to state - * as per Raft + * as per Raft */ boolean applyModificationToStateBeforeConsensus(); }