Fixed a lot of checkstyle warnings and cleaned up javadocs for classes in the
org.opendaylight.controller.cluster.raft package.
Change-Id: I67dd997701fe6eaf6c87e77954a4c1d4aa5fda69
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
32 files changed:
private long previousSnapshotTerm = -1;
private int dataSize = 0;
private long previousSnapshotTerm = -1;
private int dataSize = 0;
- public AbstractReplicatedLogImpl(long snapshotIndex,
- long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
+ protected AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm,
+ List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
this.logContext = logContext;
this.journal = new ArrayList<>(unAppliedEntries.size());
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
this.logContext = logContext;
this.journal = new ArrayList<>(unAppliedEntries.size());
- for(ReplicatedLogEntry entry: unAppliedEntries) {
+ for (ReplicatedLogEntry entry: unAppliedEntries) {
- public AbstractReplicatedLogImpl() {
+ protected AbstractReplicatedLogImpl() {
this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
}
this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
}
- for(int i = adjustedIndex; i < journal.size(); i++) {
+ for (int i = adjustedIndex; i < journal.size(); i++) {
dataSize -= journal.get(i).size();
}
dataSize -= journal.get(i).size();
}
@Override
public boolean append(ReplicatedLogEntry replicatedLogEntry) {
@Override
public boolean append(ReplicatedLogEntry replicatedLogEntry) {
- if(replicatedLogEntry.getIndex() > lastIndex()) {
+ if (replicatedLogEntry.getIndex() > lastIndex()) {
journal.add(replicatedLogEntry);
dataSize += replicatedLogEntry.size();
return true;
journal.add(replicatedLogEntry);
dataSize += replicatedLogEntry.size();
return true;
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
int maxIndex = adjustedIndex + maxEntries;
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
int maxIndex = adjustedIndex + maxEntries;
- if(maxDataSize == NO_MAX_SIZE) {
+ if (maxDataSize == NO_MAX_SIZE) {
return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
} else {
return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
} else {
- List<ReplicatedLogEntry> retList = new ArrayList<>(maxIndex - adjustedIndex);
- long totalSize = 0;
- for(int i = adjustedIndex; i < maxIndex; i++) {
- ReplicatedLogEntry entry = journal.get(i);
- totalSize += entry.size();
- if(totalSize <= maxDataSize) {
- retList.add(entry);
- } else {
- if(retList.isEmpty()) {
- // Edge case - the first entry's size exceeds the threshold. We need to return
- // at least the first entry so add it here.
- retList.add(entry);
- }
-
- break;
- }
- }
-
- return retList;
+ return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
}
} else {
return Collections.emptyList();
}
}
}
} else {
return Collections.emptyList();
}
}
+ private List<ReplicatedLogEntry> copyJournalEntries(int fromIndex, int toIndex, long maxDataSize) {
+ List<ReplicatedLogEntry> retList = new ArrayList<>(toIndex - fromIndex);
+ long totalSize = 0;
+ for (int i = fromIndex; i < toIndex; i++) {
+ ReplicatedLogEntry entry = journal.get(i);
+ totalSize += entry.size();
+ if (totalSize <= maxDataSize) {
+ retList.add(entry);
+ } else {
+ if (retList.isEmpty()) {
+ // Edge case - the first entry's size exceeds the threshold. We need to return
+ // at least the first entry so add it here.
+ retList.add(entry);
+ }
+
+ break;
+ }
+ }
+
+ return retList;
+ }
+
@Override
public long size() {
@Override
public long size() {
return false;
}
int adjustedIndex = adjustedIndex(logEntryIndex);
return false;
}
int adjustedIndex = adjustedIndex(logEntryIndex);
- return (adjustedIndex >= 0);
+ return adjustedIndex >= 0;
snapshottedJournal = new ArrayList<>(journal.size());
snapshottedJournal = new ArrayList<>(journal.size());
- List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
+ List<ReplicatedLogEntry> snapshotJournalEntries =
+ journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
snapshottedJournal.addAll(snapshotJournalEntries);
snapshotJournalEntries.clear();
snapshottedJournal.addAll(snapshotJournalEntries);
snapshotJournalEntries.clear();
previousSnapshotTerm = -1;
dataSize = 0;
// need to recalc the datasize based on the entries left after precommit.
previousSnapshotTerm = -1;
dataSize = 0;
// need to recalc the datasize based on the entries left after precommit.
- for(ReplicatedLogEntry logEntry : journal) {
+ for (ReplicatedLogEntry logEntry : journal) {
dataSize += logEntry.size();
}
dataSize += logEntry.size();
}
public interface ClientRequestTracker {
/**
public interface ClientRequestTracker {
/**
+ * Returns the client actor that should be sent a response when consensus is achieved.
- * @return the client actor that should be sent a response when consensus is achieved
+ * @return the client actor
*/
ActorRef getClientActor();
/**
*/
ActorRef getClientActor();
/**
+ * Returns the identifier of the object that is to be replicated. For example a transaction identifier in the case
+ * of a transaction.
- * @return the identifier of the object that is to be replicated. For example a transaction identifier in the case
- * of a transaction
+ * @return the identifier
*/
Identifier getIdentifier();
/**
*/
Identifier getIdentifier();
/**
+ * Returns the index of the log entry that is to be replicated.
- * @return the index of the log entry that is to be replicated
*/
public interface ConfigParams {
/**
*/
public interface ConfigParams {
/**
- * The minimum number of entries to be present in the in-memory Raft log
- * for a snapshot to be taken
+ * Returns the minimum number of entries to be present in the in-memory Raft log for a snapshot to be taken.
+ * @return the minimum number of entries.
*/
long getSnapshotBatchCount();
/**
*/
long getSnapshotBatchCount();
/**
- * The percentage of total memory in the in-memory Raft log before a snapshot
- * is to be taken
+ * Returns the percentage of total memory used in the in-memory Raft log before a snapshot should be taken.
+ * @return the percentage.
*/
int getSnapshotDataThresholdPercentage();
/**
*/
int getSnapshotDataThresholdPercentage();
/**
- * The interval at which a heart beat message will be sent to the remote
- * RaftActor
+ * Returns the interval at which a heart beat message should be sent to remote followers.
- * @return FiniteDuration
+ * @return the interval as a FiniteDuration.
*/
FiniteDuration getHeartBeatInterval();
/**
*/
FiniteDuration getHeartBeatInterval();
/**
- * The interval in which a new election would get triggered if no leader is found
+ * Returns the interval after which a new election should be triggered if no leader is available.
- * Normally its set to atleast twice the heart beat interval
- *
- * @return FiniteDuration
+ * @return the interval as a FiniteDuration.
*/
FiniteDuration getElectionTimeOutInterval();
/**
*/
FiniteDuration getElectionTimeOutInterval();
/**
- * The maximum election time variance. The election is scheduled using both
- * the Election Timeout and Variance
+ * Returns the maximum election time variance. The election is scheduled using both the election timeout and variance.
+ * @return the election time variance.
*/
int getElectionTimeVariance();
/**
*/
int getElectionTimeVariance();
/**
- * The size (in bytes) of the snapshot chunk sent from Leader
+ * Returns the maximum size (in bytes) for the snapshot chunk sent from a Leader.
+ *
+ * @return the maximum size (in bytes).
*/
int getSnapshotChunkSize();
/**
*/
int getSnapshotChunkSize();
/**
- * The number of journal log entries to batch on recovery before applying.
+ * Returns the maximum number of journal log entries to batch on recovery before applying.
+ *
+ * @return the maximum number of journal log entries.
*/
int getJournalRecoveryLogBatchSize();
/**
*/
int getJournalRecoveryLogBatchSize();
/**
- * The interval in which the leader needs to check itself if its isolated
- * @return FiniteDuration
+ * Returns the interval in which the leader needs to check if its isolated.
+ *
+ * @return the interval in ms.
*/
long getIsolatedCheckIntervalInMillis();
/**
*/
long getIsolatedCheckIntervalInMillis();
/**
- * The multiplication factor to be used to determine shard election timeout. The election timeout
- * is determined by multiplying the election timeout factor with the heartbeat duration.
+ * Returns the multiplication factor to be used to determine the shard election timeout. The election timeout
+ * is determined by multiplying the election timeout factor with the heart beat duration.
+ *
+ * @return the election timeout factor.
*/
long getElectionTimeoutFactor();
/**
*/
long getElectionTimeoutFactor();
/**
+ * Returns the RaftPolicy used to determine certain Raft behaviors.
- * @return An instance of org.opendaylight.controller.cluster.raft.policy.RaftPolicy or an instance of the
- * DefaultRaftPolicy
+ * @return an instance of org.opendaylight.controller.cluster.raft.policy.RaftPolicy, if set, or an instance of the
+ * DefaultRaftPolicy.
RaftPolicy getRaftPolicy();
/**
* Returns the PeerAddressResolver.
RaftPolicy getRaftPolicy();
/**
* Returns the PeerAddressResolver.
+ *
+ * @return the PeerAddressResolver instance.
- @Nonnull PeerAddressResolver getPeerAddressResolver();
+ @Nonnull
+ PeerAddressResolver getPeerAddressResolver();
- * @return the RaftPolicy class used by this configuration
+ * Returns the custom RaftPolicy class name.
+ *
+ * @return the RaftPolicy class name or null if none set.
*/
String getCustomRaftPolicyImplementationClass();
*/
String getCustomRaftPolicyImplementationClass();
String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
LOG.info("Trying to use custom RaftPolicy {}", className);
Class<?> c = Class.forName(className);
String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
LOG.info("Trying to use custom RaftPolicy {}", className);
Class<?> c = Class.forName(className);
- RaftPolicy obj = (RaftPolicy)c.newInstance();
- return obj;
+ return (RaftPolicy)c.newInstance();
} catch (Exception e) {
if(LOG.isDebugEnabled()) {
LOG.error("Could not create custom raft policy, will stick with default", e);
} catch (Exception e) {
if(LOG.isDebugEnabled()) {
LOG.error("Could not create custom raft policy, will stick with default", e);
package org.opendaylight.controller.cluster.raft;
package org.opendaylight.controller.cluster.raft;
+import javax.annotation.Nullable;
+
/**
* ElectionTerm contains information about a RaftActors election term.
* <p>
* This information includes the last known current term of the RaftActor
/**
* ElectionTerm contains information about a RaftActors election term.
* <p>
* This information includes the last known current term of the RaftActor
- * and which peer was voted for by the RaftActor in that term
+ * and which candidate was voted for by the RaftActor in that term.
- * This class ensures that election term information is persisted
+ * This class ensures that election term information is persisted.
*/
public interface ElectionTerm {
/**
*/
public interface ElectionTerm {
/**
- * latest term server has seen (initialized to 0
- * on first boot, increases monotonically)
+ * Returns the current leader's Raft term.
+ *
+ * @return the current leader's Raft term.
*/
long getCurrentTerm();
/**
*/
long getCurrentTerm();
/**
- * candidateId that received vote in current
- * term (or null if none)
+ * Returns the id of the candidate that this server voted for in current term.
+ *
+ * @return candidate id that received the vote or null if no candidate was voted for.
String getVotedFor();
/**
String getVotedFor();
/**
- * To be called mainly when we are recovering in-memory election state from
- * persistent storage
+ * This method updates the in-memory election term state. This method should be called when recovering election
+ * state from persistent storage.
- * @param currentTerm
- * @param votedFor
+ * @param term the election term.
+ * @param votedFor the candidate id that was voted for.
- void update(long currentTerm, String votedFor);
+ void update(long term, @Nullable String votedFor);
- * To be called when we need to update the current term either because we
- * received a message from someone with a more up-to-date term or because we
- * just voted for someone
- * <p>
- * This information needs to be persisted so that on recovery the replica
- * can start itself in the right term and know if it has already voted in
- * that term or not
+ * This method updates the in-memory election term state and persists it so it can be recovered on next restart.
+ * This method should be called when starting a new election or when a Raft RPC message is received with a higher
+ * term.
- * @param currentTerm
- * @param votedFor
+ * @param term the election term.
+ * @param votedFor the candidate id that was voted for.
- void updateAndPersist(long currentTerm, String votedFor);
+ void updateAndPersist(long term, @Nullable String votedFor);
* Implementation of ElectionTerm for the RaftActor.
*/
class ElectionTermImpl implements ElectionTerm {
* Implementation of ElectionTerm for the RaftActor.
*/
class ElectionTermImpl implements ElectionTerm {
- /**
- * Identifier of the actor whose election term information this is
- */
private long currentTerm = 0;
private String votedFor = null;
private long currentTerm = 0;
private String votedFor = null;
- @Override public void update(long currentTerm, String votedFor) {
- if(log.isDebugEnabled()) {
- log.debug("{}: Set currentTerm={}, votedFor={}", logId, currentTerm, votedFor);
- }
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
+ @Override
+ public void update(long newTerm, String newVotedFor) {
+ log.debug("{}: Set currentTerm={}, votedFor={}", logId, newTerm, newVotedFor);
+ this.currentTerm = newTerm;
+ this.votedFor = newVotedFor;
- public void updateAndPersist(long currentTerm, String votedFor){
- update(currentTerm, votedFor);
+ public void updateAndPersist(long newTerm, String newVotedFor) {
+ update(newTerm, newVotedFor);
// FIXME : Maybe first persist then update the state
persistence.persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), NoopProcedure.instance());
}
// FIXME : Maybe first persist then update the state
persistence.persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), NoopProcedure.instance());
}
-}
\ No newline at end of file
*/
package org.opendaylight.controller.cluster.raft;
*/
package org.opendaylight.controller.cluster.raft;
+import com.google.common.annotations.VisibleForTesting;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
/**
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
/**
- * The state of the followers log as known by the Leader
+ * The state of the followers log as known by the Leader.
*/
public interface FollowerLogInformation {
/**
*/
public interface FollowerLogInformation {
/**
- * Increment the value of the nextIndex
+ * Increments the value of the follower's next index.
- * @return the new value of nextIndex
+ * @return the new value of nextIndex.
*/
long incrNextIndex();
/**
*/
long incrNextIndex();
/**
- * Decrement the value of the nextIndex
+ * Decrements the value of the follower's next index.
- * @return the new value of nextIndex
+ * @return the new value of nextIndex,
*/
long decrNextIndex();
/**
*/
long decrNextIndex();
/**
- * Sets the index of the next log entry for this follower.
+ * Sets the index of the follower's next log entry.
+ * @param nextIndex the new index.
* @return true if the new index differed from the current index and the current index was updated, false
* otherwise.
*/
boolean setNextIndex(long nextIndex);
/**
* @return true if the new index differed from the current index and the current index was updated, false
* otherwise.
*/
boolean setNextIndex(long nextIndex);
/**
- * Increment the value of the matchIndex
+ * Increments the value of the follower's match index.
- * @return the new value of matchIndex
+ * @return the new value of matchIndex.
*/
long incrMatchIndex();
/**
*/
long incrMatchIndex();
/**
- * Sets the index of the highest log entry for this follower.
+ * Sets the index of the follower's highest log entry.
+ * @param matchIndex the new index.
* @return true if the new index differed from the current index and the current index was updated, false
* otherwise.
*/
boolean setMatchIndex(long matchIndex);
/**
* @return true if the new index differed from the current index and the current index was updated, false
* otherwise.
*/
boolean setMatchIndex(long matchIndex);
/**
+ * Returns the identifier of the follower.
- * @return the identifier of the follower. This could simply be the url of the remote actor.
+ * @return the identifier of the follower.
- * @return index of the next log entry to send to that server (initialized to leader last log index + 1)
+ * Returns the index of the next log entry to send to the follower.
+ *
+ * @return index of the follower's next log entry.
*/
long getNextIndex();
/**
*/
long getNextIndex();
/**
- * @return index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)
+ * Returns the index of highest log entry known to be replicated on the follower.
+ *
+ * @return the index of highest log entry.
*/
long getMatchIndex();
/**
*/
long getMatchIndex();
/**
- * Checks if the follower is active by comparing the last updated with the duration
+ * Checks if the follower is active by comparing the time of the last activity with the election time out. The
+ * follower is active if some activity has occurred for the follower within the election time out interval.
- * @return true if follower is active, false otherwise
+ * @return true if follower is active, false otherwise.
*/
boolean isFollowerActive();
/**
*/
boolean isFollowerActive();
/**
- * restarts the timeout clock of the follower
+ * Marks the follower as active. This should be called when some activity has occurred for the follower.
*/
void markFollowerActive();
/**
*/
void markFollowerActive();
/**
- * This will stop the timeout clock
+ * Marks the follower as inactive. This should only be called from unit tests.
void markFollowerInActive();
/**
void markFollowerInActive();
/**
- * This will return the active time of follower, since it was last reset
+ * Returns the time since the last activity occurred for the follower.
- * @return time in milliseconds since the last activity from the follower
+ * @return time in milliseconds since the last activity from the follower.
*/
long timeSinceLastActivity();
/**
*/
long timeSinceLastActivity();
/**
- * This method checks if it is ok to replicate
+ * This method checks if the next replicate message can be sent to the follower. This is an optimization to avoid
+ * sending duplicate message too frequently if the last replicate message was sent and no reply has been received
+ * yet within the current heart beat interval
*
* @return true if it is ok to replicate, false otherwise
*/
boolean okToReplicate();
/**
*
* @return true if it is ok to replicate, false otherwise
*/
boolean okToReplicate();
/**
- * @return the payload data version of the follower.
+ * Returns the log entry payload data version of the follower.
+ *
+ * @return the payload data version.
*/
short getPayloadVersion();
/**
* Sets the payload data version of the follower.
*/
short getPayloadVersion();
/**
* Sets the payload data version of the follower.
+ *
+ * @param payloadVersion the payload data version.
*/
void setPayloadVersion(short payloadVersion);
/**
*/
void setPayloadVersion(short payloadVersion);
/**
+ * Returns the the raft version of the follower.
+ *
* @return the raft version of the follower.
*/
short getRaftVersion();
/**
* Sets the raft version of the follower.
* @return the raft version of the follower.
*/
short getRaftVersion();
/**
* Sets the raft version of the follower.
+ *
+ * @param raftVersion the raft version.
- void setRaftVersion(short payloadVersion);
+ void setRaftVersion(short raftVersion);
/**
* Returns the LeaderInstallSnapshotState for the in progress install snapshot.
/**
* Returns the LeaderInstallSnapshotState for the in progress install snapshot.
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
+/**
+ * Implementation of the FollowerLogInformation interface.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
public class FollowerLogInformationImpl implements FollowerLogInformation {
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
public class FollowerLogInformationImpl implements FollowerLogInformation {
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private LeaderInstallSnapshotState installSnapshotState;
private LeaderInstallSnapshotState installSnapshotState;
+ /**
+ * Constructs an instance.
+ *
+ * @param peerInfo the associated PeerInfo of the follower.
+ * @param matchIndex the initial match index.
+ * @param context the RaftActorContext.
+ */
public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
this.nextIndex = context.getCommitIndex();
this.matchIndex = matchIndex;
public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
this.nextIndex = context.getCommitIndex();
this.matchIndex = matchIndex;
@Override
public boolean setNextIndex(long nextIndex) {
@Override
public boolean setNextIndex(long nextIndex) {
- if(this.nextIndex != nextIndex) {
+ if (this.nextIndex != nextIndex) {
this.nextIndex = nextIndex;
return true;
}
this.nextIndex = nextIndex;
return true;
}
- public long incrMatchIndex(){
+ public long incrMatchIndex() {
return matchIndex++;
}
@Override
public boolean setMatchIndex(long matchIndex) {
return matchIndex++;
}
@Override
public boolean setMatchIndex(long matchIndex) {
- if(this.matchIndex != matchIndex) {
+ if (this.matchIndex != matchIndex) {
this.matchIndex = matchIndex;
return true;
}
this.matchIndex = matchIndex;
return true;
}
@Override
public boolean isFollowerActive() {
@Override
public boolean isFollowerActive() {
- if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+ if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
return false;
}
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
return false;
}
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
- return (stopwatch.isRunning()) &&
- (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
+ return stopwatch.isRunning()
+ && elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis();
@Override
public boolean okToReplicate() {
@Override
public boolean okToReplicate() {
- if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
+ if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
return false;
}
// Return false if we are trying to send duplicate data before the heartbeat interval
return false;
}
// Return false if we are trying to send duplicate data before the heartbeat interval
- if(getNextIndex() == lastReplicatedIndex){
- if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
- .getHeartBeatInterval().toMillis()){
- return false;
- }
+ if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ < context.getConfigParams().getHeartBeatInterval().toMillis()) {
+ return false;
private void resetLastReplicated(){
lastReplicatedIndex = getNextIndex();
private void resetLastReplicated(){
lastReplicatedIndex = getNextIndex();
- if(lastReplicatedStopwatch.isRunning()){
+ if (lastReplicatedStopwatch.isRunning()) {
lastReplicatedStopwatch.reset();
}
lastReplicatedStopwatch.start();
lastReplicatedStopwatch.reset();
}
lastReplicatedStopwatch.start();
@Override
public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) {
@Override
public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) {
- if(this.installSnapshotState == null) {
+ if (this.installSnapshotState == null) {
this.installSnapshotState = Preconditions.checkNotNull(state);
}
}
this.installSnapshotState = Preconditions.checkNotNull(state);
}
}
@Override
public void onReceive(Object message) {
@Override
public void onReceive(Object message) {
- if(message instanceof CaptureSnapshotReply) {
+ if (message instanceof CaptureSnapshotReply) {
Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(),
params.captureSnapshot.getUnAppliedEntries(),
params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(),
Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(),
params.captureSnapshot.getUnAppliedEntries(),
params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(),
LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot);
LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot);
- params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)), getSelf());
+ params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)),
+ getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
} else if (message instanceof ReceiveTimeout) {
LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms",
getSelf().tell(PoisonPill.getInstance(), getSelf());
} else if (message instanceof ReceiveTimeout) {
LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms",
- public void update(long currentTerm, String votedFor) {
+ public void update(long newTerm, String newVotedFor) {
throw new UnsupportedOperationException();
}
@Override
throw new UnsupportedOperationException();
}
@Override
- public void updateAndPersist(long currentTerm, String votedFor) {
+ public void updateAndPersist(long newTerm, String newVotedFor) {
throw new UnsupportedOperationException();
}
throw new UnsupportedOperationException();
}
* An akka Procedure that does nothing.
*
* @author Thomas Pantelis
* An akka Procedure that does nothing.
*
* @author Thomas Pantelis
+ *
+ * @param <T> the Procedure type
*/
public class NoopProcedure<T> implements Procedure<T> {
*/
public class NoopProcedure<T> implements Procedure<T> {
@Override
public void apply(Object notUsed) {
@Override
public void apply(Object notUsed) {
*
* @author Thomas Pantelis
*/
*
* @author Thomas Pantelis
*/
public interface PeerAddressResolver {
/**
* Resolves a raft actor peer id to it's remote actor address.
public interface PeerAddressResolver {
/**
* Resolves a raft actor peer id to it's remote actor address.
private String address;
private VotingState votingState;
private String address;
private VotingState votingState;
+ /**
+ * Constructs an instance.
+ *
+ * @param id the id of the peer.
+ * @param address the address of the peer.
+ * @param votingState the VotingState of the peer.
+ */
public PeerInfo(String id, String address, VotingState votingState) {
this.id = id;
this.address = address;
public PeerInfo(String id, String address, VotingState votingState) {
this.id = id;
this.address = address;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
- protected final Logger LOG = LoggerFactory.getLogger(getClass());
-
/**
* This context should NOT be passed directly to any other actor it is
/**
* This context should NOT be passed directly to any other actor it is
- * only to be consumed by the RaftActorBehaviors
+ * only to be consumed by the RaftActorBehaviors.
*/
private final RaftActorContextImpl context;
*/
private final RaftActorContextImpl context;
private boolean shuttingDown;
private boolean shuttingDown;
- public RaftActor(String id, Map<String, String> peerAddresses,
+ protected RaftActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams, short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
Optional<ConfigParams> configParams, short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
- configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(),
+ configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, LOG);
context.setPayloadVersion(payloadVersion);
delegatingPersistenceProvider, LOG);
context.setPayloadVersion(payloadVersion);
@Override
protected void handleRecover(Object message) {
@Override
protected void handleRecover(Object message) {
- if(raftRecovery == null) {
+ if (raftRecovery == null) {
raftRecovery = newRaftActorRecoverySupport();
}
boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
raftRecovery = newRaftActorRecoverySupport();
}
boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
+ if (recoveryComplete) {
onRecoveryComplete();
initializeBehavior();
onRecoveryComplete();
initializeBehavior();
- void initializeBehavior(){
+ void initializeBehavior() {
changeCurrentBehavior(new Follower(context));
}
changeCurrentBehavior(new Follower(context));
}
long startTime = System.nanoTime();
long startTime = System.nanoTime();
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("{}: Applying state for log index {} data {}",
persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
applyState.getReplicatedLogEntry().getData());
LOG.debug("{}: Applying state for log index {} data {}",
persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
applyState.getReplicatedLogEntry().getData());
}
long elapsedTime = System.nanoTime() - startTime;
}
long elapsedTime = System.nanoTime() - startTime;
- if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
+ if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
}
LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
}
} else if (message instanceof ApplyJournalEntries) {
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
} else if (message instanceof ApplyJournalEntries) {
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
- }
+ LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
persistence().persist(applyEntries, NoopProcedure.instance());
persistence().persist(applyEntries, NoopProcedure.instance());
new FindLeaderReply(getLeaderAddress()),
getSelf()
);
new FindLeaderReply(getLeaderAddress()),
getSelf()
);
- } else if(message instanceof GetOnDemandRaftState) {
+ } else if (message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
onGetOnDemandRaftStats();
- } else if(message instanceof InitiateCaptureSnapshot) {
+ } else if (message instanceof InitiateCaptureSnapshot) {
- } else if(message instanceof SwitchBehavior) {
+ } else if (message instanceof SwitchBehavior) {
switchBehavior((SwitchBehavior) message);
switchBehavior((SwitchBehavior) message);
- } else if(message instanceof LeaderTransitioning) {
+ } else if (message instanceof LeaderTransitioning) {
- } else if(message instanceof Shutdown) {
+ } else if (message instanceof Shutdown) {
- } else if(message instanceof Runnable) {
+ } else if (message instanceof Runnable) {
((Runnable)message).run();
((Runnable)message).run();
- } else if(message instanceof NoopPayload) {
+ } else if (message instanceof NoopPayload) {
persistData(null, null, (NoopPayload)message);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
persistData(null, null, (NoopPayload)message);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
- if(leadershipTransferInProgress == null) {
+ if (leadershipTransferInProgress == null) {
leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
private void onShutDown() {
LOG.debug("{}: onShutDown", persistenceId());
private void onShutDown() {
LOG.debug("{}: onShutDown", persistenceId());
private void onLeaderTransitioning() {
LOG.debug("{}: onLeaderTransitioning", persistenceId());
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
private void onLeaderTransitioning() {
LOG.debug("{}: onLeaderTransitioning", persistenceId());
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
- if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+ if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
}
}
private void switchBehavior(SwitchBehavior message) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
}
}
private void switchBehavior(SwitchBehavior message) {
- if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
+ if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
RaftState newState = message.getNewState();
RaftState newState = message.getNewState();
- if( newState == RaftState.Leader || newState == RaftState.Follower) {
+ if ( newState == RaftState.Leader || newState == RaftState.Follower) {
switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
Map<String, String> peerAddresses = new HashMap<>();
Map<String, Boolean> peerVotingStates = new HashMap<>();
Map<String, String> peerAddresses = new HashMap<>();
Map<String, Boolean> peerVotingStates = new HashMap<>();
- for(PeerInfo info: context.getPeers()) {
+ for (PeerInfo info: context.getPeers()) {
peerVotingStates.put(info.getId(), info.isVoting());
peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
}
peerVotingStates.put(info.getId(), info.isVoting());
peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
}
builder.lastLogTerm(lastLogEntry.getTerm());
}
builder.lastLogTerm(lastLogEntry.getTerm());
}
- if(getCurrentBehavior() instanceof AbstractLeader) {
+ if (getCurrentBehavior() instanceof AbstractLeader) {
AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
Collection<String> followerIds = leader.getFollowerIds();
List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
Collection<String> followerIds = leader.getFollowerIds();
List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
- for(String id: followerIds) {
+ for (String id: followerIds) {
final FollowerLogInformation info = leader.getFollower(id);
followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
final FollowerLogInformation info = leader.getFollower(id);
followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
// it can happen that the state has not changed but the leader has changed.
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
// it can happen that the state has not changed but the leader has changed.
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
- if(!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) ||
- oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
- if(roleChangeNotifier.isPresent()) {
+ if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
+ || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
+ if (roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
currentBehavior.getLeaderPayloadVersion()), getSelf());
}
onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
currentBehavior.getLeaderPayloadVersion()), getSelf());
}
onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
- if(leadershipTransferInProgress != null) {
+ if (leadershipTransferInProgress != null) {
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
}
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
}
- if (roleChangeNotifier.isPresent() &&
- (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
+ if (roleChangeNotifier.isPresent()
+ && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
currentBehavior.state().name()), getSelf());
}
roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
currentBehavior.state().name()), getSelf());
}
public long snapshotSequenceNr() {
// When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
// so that we can delete the persistent journal based on the saved sequence-number
public long snapshotSequenceNr() {
// When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
// so that we can delete the persistent journal based on the saved sequence-number
- // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot
- // was saved and not the number we saved.
- // We would want to override it , by asking akka to use the last-sequence number known to us.
+ // However , when akka replays the journal during recovery, it replays it from the sequence number when the
+ // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the
+ // last-sequence number known to us.
return context.getSnapshotManager().getLastSequenceNumber();
}
/**
* When a derived RaftActor needs to persist something it must call
* persistData.
return context.getSnapshotManager().getLastSequenceNumber();
}
/**
* When a derived RaftActor needs to persist something it must call
* persistData.
- *
- * @param clientActor
- * @param identifier
- * @param data
*/
protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
*/
protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
- }
+ LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
final RaftActorContext raftContext = getRaftActorContext();
replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
final RaftActorContext raftContext = getRaftActorContext();
replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
// Increment the Commit Index and the Last Applied values
raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
raftContext.setLastApplied(replicatedLogEntry1.getIndex());
// Increment the Commit Index and the Last Applied values
raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
raftContext.setLastApplied(replicatedLogEntry1.getIndex());
/**
* Derived actors can call the isLeader method to check if the current
/**
* Derived actors can call the isLeader method to check if the current
- * RaftActor is the Leader or not
+ * RaftActor is the Leader or not.
*
* @return true it this RaftActor is a Leader false otherwise
*/
*
* @return true it this RaftActor is a Leader false otherwise
*/
}
protected final boolean isLeaderActive() {
}
protected final boolean isLeaderActive() {
- return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader &&
- !shuttingDown && !isLeadershipTransferInProgress();
+ return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader
+ && !shuttingDown && !isLeadershipTransferInProgress();
}
private boolean isLeadershipTransferInProgress() {
}
private boolean isLeadershipTransferInProgress() {
*
* @return A reference to the leader if known, null otherwise
*/
*
* @return A reference to the leader if known, null otherwise
*/
- public ActorSelection getLeader(){
+ public ActorSelection getLeader() {
String leaderAddress = getLeaderAddress();
String leaderAddress = getLeaderAddress();
- if(leaderAddress == null){
+ if (leaderAddress == null) {
+ * Returns the id of the current leader.
*
* @return the current leader's id
*/
*
* @return the current leader's id
*/
- protected final String getLeaderId(){
+ protected final String getLeaderId() {
return getCurrentBehavior().getLeaderId();
}
return getCurrentBehavior().getLeaderId();
}
return getCurrentBehavior().state();
}
return getCurrentBehavior().state();
}
- protected Long getCurrentTerm(){
+ protected Long getCurrentTerm() {
return context.getTermInformation().getCurrentTerm();
}
return context.getTermInformation().getCurrentTerm();
}
protected void updateConfigParams(ConfigParams configParams) {
// obtain the RaftPolicy for oldConfigParams and the updated one.
protected void updateConfigParams(ConfigParams configParams) {
// obtain the RaftPolicy for oldConfigParams and the updated one.
- String oldRaftPolicy = context.getConfigParams().
- getCustomRaftPolicyImplementationClass();
- String newRaftPolicy = configParams.
- getCustomRaftPolicyImplementationClass();
+ String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
+ String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass();
LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
oldRaftPolicy, newRaftPolicy);
LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
oldRaftPolicy, newRaftPolicy);
String previousLeaderId = behavior.getLeaderId();
short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
String previousLeaderId = behavior.getLeaderId();
short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
- LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
+ LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(),
+ previousLeaderId);
changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
} else {
changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
} else {
protected void setPersistence(boolean persistent) {
DataPersistenceProvider currentPersistence = persistence();
protected void setPersistence(boolean persistent) {
DataPersistenceProvider currentPersistence = persistence();
- if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
+ if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
setPersistence(new PersistentDataProvider(this));
setPersistence(new PersistentDataProvider(this));
- if(getCurrentBehavior() != null) {
+ if (getCurrentBehavior() != null) {
LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
captureSnapshot();
}
LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
captureSnapshot();
}
- } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
+ } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
setPersistence(new NonPersistentDataProvider() {
/**
* The way snapshotting works is,
setPersistence(new NonPersistentDataProvider() {
/**
* The way snapshotting works is,
- public void saveSnapshot(Object o) {
+ public void saveSnapshot(Object object) {
// Make saving Snapshot successful
// Committing the snapshot here would end up calling commit in the creating state which would
// be a state violation. That's why now we send a message to commit the snapshot.
// Make saving Snapshot successful
// Committing the snapshot here would end up calling commit in the creating state which would
// be a state violation. That's why now we send a message to commit the snapshot.
/**
* setPeerAddress sets the address of a known peer at a later time.
/**
* setPeerAddress sets the address of a known peer at a later time.
* <p>
* This is to account for situations where a we know that a peer
* exists but we do not know an address up-front. This may also be used in
* situations where a known peer starts off in a different location and we
* need to change it's address
* <p>
* This is to account for situations where a we know that a peer
* exists but we do not know an address up-front. This may also be used in
* situations where a known peer starts off in a different location and we
* need to change it's address
* <p>
* Note that if the peerId does not match the list of peers passed to
* this actor during construction an IllegalStateException will be thrown.
* <p>
* Note that if the peerId does not match the list of peers passed to
* this actor during construction an IllegalStateException will be thrown.
- *
- * @param peerId
- * @param peerAddress
- protected void setPeerAddress(String peerId, String peerAddress){
+ protected void setPeerAddress(String peerId, String peerAddress) {
context.setPeerAddress(peerId, peerAddress);
}
/**
* The applyState method will be called by the RaftActor when some data
context.setPeerAddress(peerId, peerAddress);
}
/**
* The applyState method will be called by the RaftActor when some data
- * needs to be applied to the actor's state
+ * needs to be applied to the actor's state.
*
* @param clientActor A reference to the client who sent this message. This
* is the same reference that was passed to persistData
*
* @param clientActor A reference to the client who sent this message. This
* is the same reference that was passed to persistData
protected abstract void onStateChanged();
/**
protected abstract void onStateChanged();
/**
- * Notifier Actor for this RaftActor to notify when a role change happens
+ * Notifier Actor for this RaftActor to notify when a role change happens.
+ *
* @return ActorRef - ActorRef of the notifier or Optional.absent if none.
*/
protected abstract Optional<ActorRef> getRoleChangeNotifier();
* @return ActorRef - ActorRef of the notifier or Optional.absent if none.
*/
protected abstract Optional<ActorRef> getRoleChangeNotifier();
* work prior to performing the operation. On completion of any work, the run method must be called on the
* given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
* this actor's thread dispatcher as as it modifies internal state.
* work prior to performing the operation. On completion of any work, the run method must be called on the
* given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
* this actor's thread dispatcher as as it modifies internal state.
* <p>
* The default implementation immediately runs the operation.
*
* <p>
* The default implementation immediately runs the operation.
*
}
protected void onLeaderChanged(String oldLeader, String newLeader) {
}
protected void onLeaderChanged(String oldLeader, String newLeader) {
- private String getLeaderAddress(){
- if(isLeader()){
+ private String getLeaderAddress() {
+ if (isLeader()) {
return getSelf().path().toString();
}
String leaderId = getLeaderId();
return getSelf().path().toString();
}
String leaderId = getLeaderId();
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
- persistenceId(), leaderId, peerAddress);
- }
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
- protected boolean hasFollowers(){
+ protected boolean hasFollowers() {
return getRaftActorContext().hasFollowers();
}
return getRaftActorContext().hasFollowers();
}
/**
* A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
*/
/**
* A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
*/
- private static abstract class BehaviorState implements Immutable {
+ private abstract static class BehaviorState implements Immutable {
@Nullable abstract RaftActorBehavior getBehavior();
@Nullable abstract RaftActorBehavior getBehavior();
@Nullable abstract String getLastValidLeaderId();
@Nullable abstract String getLastValidLeaderId();
@Nullable abstract String getLastLeaderId();
@Nullable abstract String getLastLeaderId();
@Nullable abstract short getLeaderPayloadVersion();
}
@Nullable abstract short getLeaderPayloadVersion();
}
import java.util.Collection;
import java.util.Optional;
import java.util.function.LongSupplier;
import java.util.Collection;
import java.util.Optional;
import java.util.function.LongSupplier;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
*/
public interface RaftActorContext {
/**
*/
public interface RaftActorContext {
/**
- * Create a new local actor
- * @param props
- * @return a reference to the newly created actor
+ * Creates a new local actor.
+ *
+ * @param props the Props used to create the actor.
+ * @return a reference to the newly created actor.
*/
ActorRef actorOf(Props props);
/**
*/
ActorRef actorOf(Props props);
/**
- * Create a actor selection
- * @param path
- * @return an actor selection for the given actor path
+ * Creates an actor selection.
+ *
+ * @param path the path.
+ * @return an actor selection for the given actor path.
*/
ActorSelection actorSelection(String path);
/**
*/
ActorSelection actorSelection(String path);
/**
- * Get the identifier for the RaftActor. This identifier represents the
- * name of the actor whose common state is being shared. For example the
- * id could be 'inventory'
+ * Returns the identifier for the RaftActor. This identifier represents the
+ * name of the actor whose common state is being shared.
*
* @return the identifier
*/
String getId();
/**
*
* @return the identifier
*/
String getId();
/**
+ * Returns the reference to the RaftActor.
+ *
* @return A reference to the RaftActor itself. This could be used to send messages
* to the RaftActor
*/
* @return A reference to the RaftActor itself. This could be used to send messages
* to the RaftActor
*/
/**
* The akka Cluster singleton for the actor system if one is configured.
*
/**
* The akka Cluster singleton for the actor system if one is configured.
*
- * @return an Optional containing the CLuster instance is present.
+ * @return an Optional containing the Cluster instance is present.
*/
Optional<Cluster> getCluster();
/**
*/
Optional<Cluster> getCluster();
/**
- * @return the ElectionTerm information
+ * Returns the current ElectionTerm information.
+ *
+ * @return the ElectionTerm.
ElectionTerm getTermInformation();
/**
ElectionTerm getTermInformation();
/**
- * @return index of highest log entry known to be committed (initialized to 0, increases monotonically)
+ * Returns the index of highest log entry known to be committed.
+ *
+ * @return index of highest log entry known to be committed.
*/
long getCommitIndex();
/**
*/
long getCommitIndex();
/**
+ * Sets the index of highest log entry known to be committed.
+ *
* @param commitIndex new commit index
*/
void setCommitIndex(long commitIndex);
/**
* @param commitIndex new commit index
*/
void setCommitIndex(long commitIndex);
/**
- * @return index of highest log entry applied to state machine (initialized to 0, increases monotonically)
+ * Returns index of highest log entry applied to state machine.
+ *
+ * @return index of highest log entry applied to state machine.
*/
long getLastApplied();
*/
long getLastApplied();
- * @param lastApplied the index of the last log entry that was applied to the state
+ * Sets index of highest log entry applied to state machine.
+ *
+ * @param lastApplied the new applied index.
*/
void setLastApplied(long lastApplied);
/**
*/
void setLastApplied(long lastApplied);
/**
+ * Sets the ReplicatedLog instance.
- * @param replicatedLog the replicated log of the current RaftActor
+ * @param replicatedLog the ReplicatedLog instance.
- void setReplicatedLog(ReplicatedLog replicatedLog);
+ void setReplicatedLog(@Nonnull ReplicatedLog replicatedLog);
- * @return A representation of the log
+ * Returns the ReplicatedLog instance.
+ *
+ * @return the ReplicatedLog instance.
ReplicatedLog getReplicatedLog();
/**
ReplicatedLog getReplicatedLog();
/**
- * @return The ActorSystem associated with this context
+ * Returns the The ActorSystem associated with this context.
+ *
+ * @return the ActorSystem.
ActorSystem getActorSystem();
/**
ActorSystem getActorSystem();
/**
- * @return the logger to be used for logging messages to a log file
+ * Returns the logger to be used for logging messages.
+ *
+ * @return the logger.
- * Get the address of the peer as a String. This is the same format in
- * which a consumer would provide the address
+ * Gets the address of a peer as a String. This is the same format in which a consumer would provide the address.
- * @param peerId
- * @return The address of the peer or null if the address has not yet been
- * resolved
+ * @param peerId the id of the peer.
+ * @return the address of the peer or null if the address has not yet been resolved.
String getPeerAddress(String peerId);
/**
String getPeerAddress(String peerId);
/**
- * @param serverCfgPayload
+ * Updates the peers and information to match the given ServerConfigurationPayload.
+ *
+ * @param serverCfgPayload the ServerConfigurationPayload.
*/
void updatePeerIds(ServerConfigurationPayload serverCfgPayload);
/**
*/
void updatePeerIds(ServerConfigurationPayload serverCfgPayload);
/**
+ * Returns the PeerInfo instances for each peer.
+ *
* @return list of PeerInfo
*/
* @return list of PeerInfo
*/
Collection<PeerInfo> getPeers();
/**
Collection<PeerInfo> getPeers();
/**
- * @return the list of peer IDs.
+ * Returns the id's for each peer.
+ *
+ * @return the list of peer id's.
Collection<String> getPeerIds();
/**
Collection<String> getPeerIds();
/**
- * Get the PeerInfo for the given peer.
+ * Returns the PeerInfo for the given peer.
+ * @return the PeerInfo or null if not found.
PeerInfo getPeerInfo(String peerId);
/**
PeerInfo getPeerInfo(String peerId);
/**
- * @param name
- * @param address
+ * @param id the id of the new peer.
+ * @param address the address of the new peer.
+ * @param votingState the VotingState of the new peer.
- void addToPeers(String name, String address, VotingState votingState);
+ void addToPeers(String id, String address, VotingState votingState);
+ * @param id the id of the peer to remove.
- void removePeer(String name);
+ void removePeer(String id);
- * Given a peerId return the corresponding actor
- * <p>
+ * Returns an ActorSelection for a peer.
- *
- * @param peerId
- * @return The actorSelection corresponding to the peer or null if the
- * address has not yet been resolved
+ * @param peerId the id of the peer.
+ * @return the actorSelection corresponding to the peer or null if the address has not yet been resolved.
ActorSelection getPeerActorSelection(String peerId);
/**
ActorSelection getPeerActorSelection(String peerId);
/**
- * Set Peer Address can be called at a later time to change the address of
- * a known peer.
+ * Sets the address of a peer.
- * <p>
- * Throws an IllegalStateException if the peer is unknown
- *
- * @param peerId
- * @param peerAddress
+ * @param peerId the id of the peer.
+ * @param peerAddress the address of the peer.
*/
void setPeerAddress(String peerId, String peerAddress);
/**
*/
void setPeerAddress(String peerId, String peerAddress);
/**
+ * Returns the ConfigParams instance.
+ *
+ * @return the ConfigParams instance.
ConfigParams getConfigParams();
/**
ConfigParams getConfigParams();
/**
+ * Returns the SnapshotManager instance.
- * @return the SnapshotManager for this RaftActor
+ * @return the SnapshotManager instance.
SnapshotManager getSnapshotManager();
/**
SnapshotManager getSnapshotManager();
/**
+ * Returns the DataPersistenceProvider instance.
- * @return the DataPersistenceProvider for this RaftActor
+ * @return the DataPersistenceProvider instance.
DataPersistenceProvider getPersistenceProvider();
/**
DataPersistenceProvider getPersistenceProvider();
/**
+ * Determines if there are any peer followers.
- * @return true if the RaftActor has followers else false
+ * @return true if there are followers otherwise false.
*/
boolean hasFollowers();
/**
*/
boolean hasFollowers();
/**
+ * Returns the total available memory for use in calculations. Normally this returns JVM's max memory but can be
+ * overridden for unit tests.
- * @return the total memory used by the ReplicatedLog
+ * @return the total memory.
*/
long getTotalMemory();
/**
*/
long getTotalMemory();
/**
+ * Sets the retriever of the total memory metric.
- * @param retriever a supplier of the total memory metric
+ * @param retriever a supplier of the total memory metric.
*/
@VisibleForTesting
void setTotalMemoryRetriever(LongSupplier retriever);
/**
*/
@VisibleForTesting
void setTotalMemoryRetriever(LongSupplier retriever);
/**
+ * Returns the payload version to be used when replicating data.
- * @return the payload version to be used when replicating data
+ * @return the payload version.
*/
short getPayloadVersion();
/**
*/
short getPayloadVersion();
/**
- * @return an implementation of the RaftPolicy so that the Raft code can be adapted
+ * Returns the RaftPolicy used to determine certain Raft behaviors.
+ *
+ * @return the RaftPolicy instance.
RaftPolicy getRaftPolicy();
/**
RaftPolicy getRaftPolicy();
/**
- * @return true if there are any dynamic server configuration changes available,
- * false if static peer configurations are still in use
+ * Determines if there have been any dynamic server configuration changes applied.
+ *
+ * @return true if dynamic server configuration changes have been applied, false otherwise, meaning that static
+ * peer configuration is still in use.
*/
boolean isDynamicServerConfigurationInUse();
/**
*/
boolean isDynamicServerConfigurationInUse();
/**
- * Configures the dynamic server configurations are avaialble for the RaftActor
+ * Sets that dynamic server configuration changes have been applied.
*/
void setDynamicServerConfigurationInUse();
/**
*/
void setDynamicServerConfigurationInUse();
/**
- * @return the RaftActor's peer information as a ServerConfigurationPayload if the
- * dynamic server configurations are available, otherwise returns null
+ * Returns the peer information as a ServerConfigurationPayload if dynamic server configurations have been applied.
+ *
+ * @param includeSelf include this peer's info.
+ * @return the peer information as a ServerConfigurationPayload or null if no dynamic server configurations have
+ * been applied.
- @Nullable ServerConfigurationPayload getPeerServerInfo(boolean includeSelf);
+ @Nullable
+ ServerConfigurationPayload getPeerServerInfo(boolean includeSelf);
- * @return true if this RaftActor is a voting member of the cluster, false otherwise.
+ * Determines if this peer is a voting member of the cluster.
+ *
+ * @return true if this peer is a voting member, false otherwise.
*/
boolean isVotingMember();
/**
*/
boolean isVotingMember();
/**
+ * Determines if there are any voting peers.
+ *
* @return true if there are any voting peers, false otherwise.
*/
boolean anyVotingPeers();
/**
* @return true if there are any voting peers, false otherwise.
*/
boolean anyVotingPeers();
/**
- * @return current behavior attached to the raft actor.
+ * Returns the current behavior attached to the RaftActor.
+ *
+ * @return current behavior.
*/
RaftActorBehavior getCurrentBehavior();
}
*/
RaftActorBehavior getCurrentBehavior();
}
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
+/**
+ * Implementation of the RaftActorContext interface.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
public class RaftActorContextImpl implements RaftActorContext {
private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
public class RaftActorContextImpl implements RaftActorContext {
private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
- private final Logger LOG;
+ private final Logger log;
private ConfigParams configParams;
private ConfigParams configParams;
this.lastApplied = lastApplied;
this.configParams = configParams;
this.persistenceProvider = persistenceProvider;
this.lastApplied = lastApplied;
this.configParams = configParams;
this.persistenceProvider = persistenceProvider;
for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
cluster = Optional.of(Cluster.get(getActorSystem()));
} catch(Exception e) {
// An exception means there's no cluster configured. This will only happen in unit tests.
cluster = Optional.of(Cluster.get(getActorSystem()));
} catch(Exception e) {
// An exception means there's no cluster configured. This will only happen in unit tests.
- LOG.debug("{}: Could not obtain Cluster: {}", getId(), e);
+ log.debug("{}: Could not obtain Cluster: {}", getId(), e);
cluster = Optional.empty();
}
}
cluster = Optional.empty();
}
}
}
@Override public Logger getLogger() {
}
@Override public Logger getLogger() {
@Override
public String getPeerAddress(String peerId) {
@Override
public String getPeerAddress(String peerId) {
- String peerAddress = null;
PeerInfo peerInfo = peerInfoMap.get(peerId);
if(peerInfo != null) {
peerAddress = peerInfo.getAddress();
PeerInfo peerInfo = peerInfoMap.get(peerId);
if(peerInfo != null) {
peerAddress = peerInfo.getAddress();
- LOG.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
+ log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
setDynamicServerConfigurationInUse();
}
setDynamicServerConfigurationInUse();
}
- public void addToPeers(String id, String address, VotingState votingState) {
- peerInfoMap.put(id, new PeerInfo(id, address, votingState));
+ public void addToPeers(String peerId, String address, VotingState votingState) {
+ peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
public void setPeerAddress(String peerId, String peerAddress) {
PeerInfo peerInfo = peerInfoMap.get(peerId);
if(peerInfo != null) {
public void setPeerAddress(String peerId, String peerAddress) {
PeerInfo peerInfo = peerInfoMap.get(peerId);
if(peerInfo != null) {
- LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
+ log.info("Peer address for peer {} set to {}", peerId, peerAddress);
peerInfo.setAddress(peerAddress);
}
}
peerInfo.setAddress(peerAddress);
}
}
@Override
public SnapshotManager getSnapshotManager() {
if(snapshotManager == null){
@Override
public SnapshotManager getSnapshotManager() {
if(snapshotManager == null){
- snapshotManager = new SnapshotManager(this, LOG);
+ snapshotManager = new SnapshotManager(this, log);
}
return snapshotManager;
}
}
return snapshotManager;
}
@Override
public boolean hasFollowers() {
@Override
public boolean hasFollowers() {
- return getPeerIds().size() > 0;
+ return !getPeerIds().isEmpty();
newConfig.add(new ServerInfo(getId(), votingMember));
}
newConfig.add(new ServerInfo(getId(), votingMember));
}
- return (new ServerConfigurationPayload(newConfig));
+ return new ServerConfigurationPayload(newConfig);
try {
currentBehavior.close();
} catch (Exception e) {
try {
currentBehavior.close();
} catch (Exception e) {
- LOG.debug("{}: Error closing behavior {}", getId(), currentBehavior.state());
+ log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
- public <T> void persist(final T o, final Procedure<T> procedure) {
- if(getDelegate().isRecoveryApplicable()) {
- super.persist(o, procedure);
+ public <T> void persist(final T object, final Procedure<T> procedure) {
+ if (getDelegate().isRecoveryApplicable()) {
+ super.persist(object, procedure);
- if(o instanceof ReplicatedLogEntry) {
- Payload payload = ((ReplicatedLogEntry)o).getData();
- if(payload instanceof PersistentPayload) {
+ if (object instanceof ReplicatedLogEntry) {
+ Payload payload = ((ReplicatedLogEntry)object).getData();
+ if (payload instanceof PersistentPayload) {
// We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
// on recovery if data persistence is later enabled.
// We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
// on recovery if data persistence is later enabled.
- persistentProvider.persist(payload, p -> procedure.apply(o));
+ persistentProvider.persist(payload, p -> procedure.apply(object));
- super.persist(o, procedure);
+ super.persist(object, procedure);
- super.persist(o, procedure);
+ super.persist(object, procedure);
// safely run on the actor's thread dispatcher.
FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
// safely run on the actor's thread dispatcher.
FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
- new Runnable() {
- @Override
- public void run() {
- LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
- finish(true);
- }
+ (Runnable) () -> {
+ LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+ finish(true);
}, raftActor.getContext().system().dispatcher(), raftActor.self());
}
}, raftActor.getContext().system().dispatcher(), raftActor.self());
}
/**
* This method is called during recovery at the start of a batch of state entries. Derived
* classes should perform any initialization needed to start a batch.
/**
* This method is called during recovery at the start of a batch of state entries. Derived
* classes should perform any initialization needed to start a batch.
+ *
+ * @param maxBatchSize the maximum batch size.
*/
void startLogRecoveryBatch(int maxBatchSize);
*/
void startLogRecoveryBatch(int maxBatchSize);
*
* @author Thomas Pantelis
*/
*
* @author Thomas Pantelis
*/
class RaftActorRecoverySupport {
private final RaftActorContext context;
private final RaftActorRecoveryCohort cohort;
class RaftActorRecoverySupport {
private final RaftActorContext context;
private final RaftActorRecoveryCohort cohort;
anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
- if(isMigratedSerializable(message)) {
+ if (isMigratedSerializable(message)) {
hasMigratedDataRecovered = true;
}
hasMigratedDataRecovered = true;
}
private void possiblyRestoreFromSnapshot() {
byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
private void possiblyRestoreFromSnapshot() {
byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
- if(restoreFromSnapshot == null) {
+ if (restoreFromSnapshot == null) {
+ if (anyDataRecovered) {
log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
context.getId());
return;
}
log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
context.getId());
return;
}
- try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
+ try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
Snapshot snapshot = (Snapshot) ois.readObject();
log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
Snapshot snapshot = (Snapshot) ois.readObject();
log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
+ } catch (Exception e) {
log.error("{}: Error deserializing snapshot restore", context.getId(), e);
}
}
log.error("{}: Error deserializing snapshot restore", context.getId(), e);
}
}
}
private void initRecoveryTimer() {
}
private void initRecoveryTimer() {
- if(recoveryTimer == null) {
+ if (recoveryTimer == null) {
recoveryTimer = Stopwatch.createStarted();
}
}
private void onRecoveredSnapshot(SnapshotOffer offer) {
recoveryTimer = Stopwatch.createStarted();
}
}
private void onRecoveredSnapshot(SnapshotOffer offer) {
- if(log.isDebugEnabled()) {
- log.debug("{}: SnapshotOffer called..", context.getId());
- }
+ log.debug("{}: SnapshotOffer called..", context.getId());
initRecoveryTimer();
Snapshot snapshot = (Snapshot) offer.snapshot();
initRecoveryTimer();
Snapshot snapshot = (Snapshot) offer.snapshot();
- for(ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
- if(isMigratedPayload(entry)) {
+ for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
+ if (isMigratedPayload(entry)) {
hasMigratedDataRecovered = true;
}
}
hasMigratedDataRecovered = true;
}
}
- if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (!context.getPersistenceProvider().isRecoveryApplicable()) {
// We may have just transitioned to disabled and have a snapshot containing state data and/or log
// entries - we don't want to preserve these, only the server config and election term info.
// We may have just transitioned to disabled and have a snapshot containing state data and/or log
// entries - we don't want to preserve these, only the server config and election term info.
if (snapshot.getServerConfiguration() != null) {
context.updatePeerIds(snapshot.getServerConfiguration());
if (snapshot.getServerConfiguration() != null) {
context.updatePeerIds(snapshot.getServerConfiguration());
- if(isMigratedSerializable(snapshot.getServerConfiguration())) {
+ if (isMigratedSerializable(snapshot.getServerConfiguration())) {
hasMigratedDataRecovered = true;
}
}
hasMigratedDataRecovered = true;
}
}
}
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
}
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
logEntry.getIndex(), logEntry.size());
}
log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
logEntry.getIndex(), logEntry.size());
}
- if(isServerConfigurationPayload(logEntry)){
+ if (isServerConfigurationPayload(logEntry)) {
context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
}
context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
}
- if(isMigratedPayload(logEntry)) {
+ if (isMigratedPayload(logEntry)) {
hasMigratedDataRecovered = true;
}
hasMigratedDataRecovered = true;
}
- if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (context.getPersistenceProvider().isRecoveryApplicable()) {
replicatedLog().append(logEntry);
replicatedLog().append(logEntry);
- } else if(!isPersistentPayload(logEntry)) {
+ } else if (!isPersistentPayload(logEntry)) {
dataRecoveredWithPersistenceDisabled = true;
}
}
private void onRecoveredApplyLogEntries(long toIndex) {
dataRecoveredWithPersistenceDisabled = true;
}
}
private void onRecoveredApplyLogEntries(long toIndex) {
- if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (!context.getPersistenceProvider().isRecoveryApplicable()) {
dataRecoveredWithPersistenceDisabled = true;
return;
}
long lastUnappliedIndex = context.getLastApplied() + 1;
dataRecoveredWithPersistenceDisabled = true;
return;
}
long lastUnappliedIndex = context.getLastApplied() + 1;
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
// it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
// but the entry itself has made it to that state and recovered via the snapshot
log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
// it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
// but the entry itself has made it to that state and recovered via the snapshot
log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
long lastApplied = lastUnappliedIndex - 1;
for (long i = lastUnappliedIndex; i <= toIndex; i++) {
ReplicatedLogEntry logEntry = replicatedLog().get(i);
long lastApplied = lastUnappliedIndex - 1;
for (long i = lastUnappliedIndex; i <= toIndex; i++) {
ReplicatedLogEntry logEntry = replicatedLog().get(i);
+ if (logEntry != null) {
lastApplied++;
batchRecoveredLogEntry(logEntry);
} else {
lastApplied++;
batchRecoveredLogEntry(logEntry);
} else {
}
private void onDeleteEntries(DeleteEntries deleteEntries) {
}
private void onDeleteEntries(DeleteEntries deleteEntries) {
- if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (context.getPersistenceProvider().isRecoveryApplicable()) {
replicatedLog().removeFrom(deleteEntries.getFromIndex());
} else {
dataRecoveredWithPersistenceDisabled = true;
replicatedLog().removeFrom(deleteEntries.getFromIndex());
} else {
dataRecoveredWithPersistenceDisabled = true;
initRecoveryTimer();
int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
initRecoveryTimer();
int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
- if(!isServerConfigurationPayload(logEntry)){
- if(currentRecoveryBatchCount == 0) {
+ if (!isServerConfigurationPayload(logEntry)) {
+ if (currentRecoveryBatchCount == 0) {
cohort.startLogRecoveryBatch(batchSize);
}
cohort.appendRecoveredLogEntry(logEntry.getData());
cohort.startLogRecoveryBatch(batchSize);
}
cohort.appendRecoveredLogEntry(logEntry.getData());
- if(++currentRecoveryBatchCount >= batchSize) {
+ if (++currentRecoveryBatchCount >= batchSize) {
endCurrentLogRecoveryBatch();
}
}
endCurrentLogRecoveryBatch();
}
}
}
private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
}
private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
- if(currentRecoveryBatchCount > 0) {
+ if (currentRecoveryBatchCount > 0) {
endCurrentLogRecoveryBatch();
}
String recoveryTime = "";
endCurrentLogRecoveryBatch();
}
String recoveryTime = "";
- if(recoveryTimer != null) {
+ if (recoveryTimer != null) {
recoveryTimer.stop();
recoveryTime = " in " + recoveryTimer.toString();
recoveryTimer = null;
}
recoveryTimer.stop();
recoveryTime = " in " + recoveryTimer.toString();
recoveryTimer = null;
}
- log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
- "Persistence Id = " + context.getId() +
- " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
- "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+ log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + "Persistence Id = "
+ + context.getId() + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, "
+ + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
replicatedLog().getSnapshotTerm(), replicatedLog().size());
replicatedLog().getSnapshotTerm(), replicatedLog().size());
- if(dataRecoveredWithPersistenceDisabled ||
- hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
- if(hasMigratedDataRecovered) {
+ if (dataRecoveredWithPersistenceDisabled
+ || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (hasMigratedDataRecovered) {
log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
} else {
log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId());
log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
} else {
log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId());
// messages. Either way, we persist a snapshot and delete all the messages from the akka journal
// to clean out unwanted messages.
// messages. Either way, we persist a snapshot and delete all the messages from the akka journal
// to clean out unwanted messages.
- Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
+ Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+ -1, -1, -1, -1,
context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
context.getPeerServerInfo(true));
persistentProvider.saveSnapshot(snapshot);
persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
context.getPeerServerInfo(true));
persistentProvider.saveSnapshot(snapshot);
persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
- } else if(hasMigratedDataRecovered) {
+ } else if (hasMigratedDataRecovered) {
log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId());
context.getSnapshotManager().capture(replicatedLog().last(), -1);
log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId());
context.getSnapshotManager().capture(replicatedLog().last(), -1);
- private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
+ private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry) {
return repLogEntry.getData() instanceof ServerConfigurationPayload;
}
return repLogEntry.getData() instanceof ServerConfigurationPayload;
}
- private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry){
+ private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry) {
return repLogEntry.getData() instanceof PersistentPayload;
}
return repLogEntry.getData() instanceof PersistentPayload;
}
- private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry){
+ private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry) {
return isMigratedSerializable(repLogEntry.getData());
}
return isMigratedSerializable(repLogEntry.getData());
}
- private static boolean isMigratedSerializable(Object message){
+ private static boolean isMigratedSerializable(Object message) {
return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
}
}
return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
}
}
- public void onNewOperation(ServerOperationContext<?> operationContext) {
+ public void onNewOperation(ServerOperationContext<?> newOperationContext) {
- sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+ sendReply(newOperationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
- super.onNewOperation(operationContext);
+ super.onNewOperation(newOperationContext);
*
* @param <T> the operation type
*/
*
* @param <T> the operation type
*/
- private static abstract class ServerOperationContext<T> {
+ private abstract static class ServerOperationContext<T> {
private final T operation;
private final ActorRef clientRequestor;
private final Identifier contextId;
private final T operation;
private final ActorRef clientRequestor;
private final Identifier contextId;
}
private void onCaptureSnapshotReply(byte[] snapshotBytes) {
}
private void onCaptureSnapshotReply(byte[] snapshotBytes) {
- log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
+ log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(),
+ snapshotBytes.length);
context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory());
}
context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory());
}
private void onGetSnapshot(ActorRef sender) {
log.debug("{}: onGetSnapshot", context.getId());
private void onGetSnapshot(ActorRef sender) {
log.debug("{}: onGetSnapshot", context.getId());
- if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (context.getPersistenceProvider().isRecoveryApplicable()) {
CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
context.getReplicatedLog().last(), -1, false);
CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
context.getReplicatedLog().last(), -1, false);
cohort.createSnapshot(snapshotReplyActor);
} else {
cohort.createSnapshot(snapshotReplyActor);
} else {
- Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
+ Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+ -1, -1, -1, -1,
context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
context.getPeerServerInfo(true));
context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
context.getPeerServerInfo(true));
package org.opendaylight.controller.cluster.raft;
/**
package org.opendaylight.controller.cluster.raft;
/**
+ * Enumerates the raft versions.
+ *
* @author Thomas Pantelis
*/
public interface RaftVersions {
* @author Thomas Pantelis
*/
public interface RaftVersions {
* @return the ReplicatedLogEntry if found, otherwise null if the adjusted index less than 0 or
* greater than the size of the in-memory journal.
*/
* @return the ReplicatedLogEntry if found, otherwise null if the adjusted index less than 0 or
* greater than the size of the in-memory journal.
*/
- @Nullable ReplicatedLogEntry get(long index);
+ @Nullable
+ ReplicatedLogEntry get(long index);
/**
* Return the last replicated log entry in the log or null of not found.
/**
* Return the last replicated log entry in the log or null of not found.
+ *
+ * @return the last replicated log entry in the log or null of not found.
- @Nullable ReplicatedLogEntry last();
+ @Nullable
+ ReplicatedLogEntry last();
/**
* Return the index of the last entry in the log or -1 if the log is empty.
/**
* Return the index of the last entry in the log or -1 if the log is empty.
+ *
+ * @return the index of the last entry in the log or -1 if the log is empty.
*/
long lastIndex();
/**
* Return the term of the last entry in the log or -1 if the log is empty.
*/
long lastIndex();
/**
* Return the term of the last entry in the log or -1 if the log is empty.
+ *
+ * @return the term of the last entry in the log or -1 if the log is empty.
*/
void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
*/
void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
+ /**
+ * Appends an entry to the in-memory log and persists it as well.
+ *
+ * @param replicatedLogEntry the entry to append
+ * @param callback the Procedure to be notified when persistence is complete.
+ */
void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
/**
void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
/**
void snapshotRollback();
/**
void snapshotRollback();
/**
- * Size of the data in the log (in bytes)
+ * Returns the size of the data in the log (in bytes)
+ *
+ * @return the size of the data in the log (in bytes).
- * We decide if snapshot need to be captured based on the count/memory consumed.
- * @param replicatedLogEntry
+ * Determines if a snapshot need to be captured based on the count/memory consumed.
+ *
+ * @param replicatedLogEntry the last log entry.
*/
void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry);
*/
void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry);
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
/**
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
/**
- * Represents one entry in the replicated log
+ * Represents one entry in the replicated log.
*/
public interface ReplicatedLogEntry {
/**
*/
public interface ReplicatedLogEntry {
/**
+ * Returns the payload/data to be replicated.
- * @return The payload/data to be replicated
+ * @return the payload/data
*/
Payload getData();
/**
*/
Payload getData();
/**
+ * Returns the term of the entry.
- * @return The term of the entry
+ * Returns the index of the entry.
- * @return The index of the entry
+ * Returns the size of the entry in bytes. An approximate number may be good enough.
- * @return The size of the entry in bytes. An approximate number may be good enough.
+ * @return the size of the entry in bytes.
class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
private static final int DATA_SIZE_DIVIDER = 5;
class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
private static final int DATA_SIZE_DIVIDER = 5;
- private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
- @Override
- public void apply(final DeleteEntries notUsed) {
- }
- };
+ private final Procedure<DeleteEntries> deleteProcedure = NoopProcedure.instance();
private final RaftActorContext context;
private long dataSizeSinceLastSnapshot = 0L;
private final RaftActorContext context;
private long dataSizeSinceLastSnapshot = 0L;
// handler. This also holds for multiple persist calls in context
// of a single command.
context.getPersistenceProvider().persist(replicatedLogEntry,
// handler. This also holds for multiple persist calls in context
// of a single command.
context.getPersistenceProvider().persist(replicatedLogEntry,
- new Procedure<ReplicatedLogEntry>() {
- @Override
- public void apply(final ReplicatedLogEntry param) throws Exception {
- context.getLogger().debug("{}: persist complete {}", context.getId(), param);
+ param -> {
+ context.getLogger().debug("{}: persist complete {}", context.getId(), param);
- int logEntrySize = param.size();
- dataSizeSinceLastSnapshot += logEntrySize;
+ int logEntrySize = param.size();
+ dataSizeSinceLastSnapshot += logEntrySize;
- if (callback != null) {
- callback.apply(param);
- }
+ if (callback != null) {
+ callback.apply(param);
-}
\ No newline at end of file
private final long term;
private final Payload payload;
private final long term;
private final Payload payload;
+ /**
+ * Constructs an instance.
+ *
+ * @param index the index
+ * @param term the term
+ * @param payload the payload
+ */
public ReplicatedLogImplEntry(final long index, final long term, final Payload payload) {
this.index = index;
this.term = term;
public ReplicatedLogImplEntry(final long index, final long term, final Payload payload) {
this.index = index;
this.term = term;
@Override
public int size() {
@Override
public int size() {
- if(serializedSize < 0) {
+ if (serializedSize < 0) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos);
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos);
import java.util.List;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import java.util.List;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+/**
+ * Represents a snapshot of the raft data.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
public class Snapshot implements Serializable {
private static final long serialVersionUID = -8298574936724056236L;
public class Snapshot implements Serializable {
private static final long serialVersionUID = -8298574936724056236L;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.slf4j.Logger;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.slf4j.Logger;
+/**
+ * Manages the capturing of snapshots for a RaftActor.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
public class SnapshotManager implements SnapshotState {
private final SnapshotState IDLE = new Idle();
private final SnapshotState PERSISTING = new Persisting();
private final SnapshotState CREATING = new Creating();
public class SnapshotManager implements SnapshotState {
private final SnapshotState IDLE = new Idle();
private final SnapshotState PERSISTING = new Persisting();
private final SnapshotState CREATING = new Creating();
- private final Logger LOG;
+ private final Logger log;
private final RaftActorContext context;
private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
new LastAppliedTermInformationReader();
private final RaftActorContext context;
private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
new LastAppliedTermInformationReader();
private ApplySnapshot applySnapshot;
private Consumer<byte[]> applySnapshotProcedure;
private ApplySnapshot applySnapshot;
private Consumer<byte[]> applySnapshotProcedure;
+ /**
+ * Constructs an instance.
+ *
+ * @param context the RaftActorContext
+ * @param logger the Logger
+ */
public SnapshotManager(RaftActorContext context, Logger logger) {
this.context = context;
public SnapshotManager(RaftActorContext context, Logger logger) {
this.context = context;
}
public boolean isApplying() {
}
public boolean isApplying() {
return captureSnapshot;
}
return captureSnapshot;
}
- private boolean hasFollowers(){
+ private boolean hasFollowers() {
return context.hasFollowers();
}
return context.hasFollowers();
}
- private String persistenceId(){
+ private String persistenceId() {
return context.getId();
}
return context.getId();
}
+ /**
+ * Constructs a CaptureSnapshot instance.
+ *
+ * @param lastLogEntry the last log entry for the snapshot.
+ * @param replicatedToAllIndex the index of the last entry replicated to all followers.
+ * @param installSnapshotInitiated true if snapshot is initiated to install on a follower.
+ * @return a new CaptureSnapshot instance.
+ */
public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
boolean installSnapshotInitiated) {
TermInformationReader lastAppliedTermInfoReader =
public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
boolean installSnapshotInitiated) {
TermInformationReader lastAppliedTermInfoReader =
long lastLogEntryIndex = lastAppliedIndex;
long lastLogEntryTerm = lastAppliedTerm;
long lastLogEntryIndex = lastAppliedIndex;
long lastLogEntryTerm = lastAppliedTerm;
- if(lastLogEntry != null) {
+ if (lastLogEntry != null) {
lastLogEntryIndex = lastLogEntry.getIndex();
lastLogEntryTerm = lastLogEntry.getTerm();
} else {
lastLogEntryIndex = lastLogEntry.getIndex();
lastLogEntryTerm = lastLogEntry.getTerm();
} else {
- LOG.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+ log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
persistenceId(), lastAppliedIndex, lastAppliedTerm);
}
persistenceId(), lastAppliedIndex, lastAppliedTerm);
}
@Override
public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
@Override
public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
- LOG.debug("capture should not be called in state {}", this);
+ log.debug("capture should not be called in state {}", this);
return false;
}
@Override
return false;
}
@Override
- public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
- LOG.debug("captureToInstall should not be called in state {}", this);
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
+ String targetFollower) {
+ log.debug("captureToInstall should not be called in state {}", this);
return false;
}
@Override
public void apply(ApplySnapshot snapshot) {
return false;
}
@Override
public void apply(ApplySnapshot snapshot) {
- LOG.debug("apply should not be called in state {}", this);
+ log.debug("apply should not be called in state {}", this);
}
@Override
public void persist(final byte[] snapshotBytes, final long totalMemory) {
}
@Override
public void persist(final byte[] snapshotBytes, final long totalMemory) {
- LOG.debug("persist should not be called in state {}", this);
+ log.debug("persist should not be called in state {}", this);
}
@Override
public void commit(final long sequenceNumber, long timeStamp) {
}
@Override
public void commit(final long sequenceNumber, long timeStamp) {
- LOG.debug("commit should not be called in state {}", this);
+ log.debug("commit should not be called in state {}", this);
}
@Override
public void rollback() {
}
@Override
public void rollback() {
- LOG.debug("rollback should not be called in state {}", this);
+ log.debug("rollback should not be called in state {}", this);
}
@Override
public long trimLog(final long desiredTrimIndex) {
}
@Override
public long trimLog(final long desiredTrimIndex) {
- LOG.debug("trimLog should not be called in state {}", this);
+ log.debug("trimLog should not be called in state {}", this);
return -1;
}
protected long doTrimLog(final long desiredTrimIndex) {
// we would want to keep the lastApplied as its used while capturing snapshots
long lastApplied = context.getLastApplied();
return -1;
}
protected long doTrimLog(final long desiredTrimIndex) {
// we would want to keep the lastApplied as its used while capturing snapshots
long lastApplied = context.getLastApplied();
- long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+ long tempMin = Math.min(desiredTrimIndex, lastApplied > -1 ? lastApplied - 1 : -1);
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+ if (log.isTraceEnabled()) {
+ log.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
persistenceId(), desiredTrimIndex, lastApplied, tempMin);
}
if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
persistenceId(), desiredTrimIndex, lastApplied, tempMin);
}
if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
+ log.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
context.getTermInformation().getCurrentTerm());
//use the term of the temp-min, since we check for isPresent, entry will not be null
context.getTermInformation().getCurrentTerm());
//use the term of the temp-min, since we check for isPresent, entry will not be null
}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+ if (tempMin > currentBehavior.getReplicatedToAllIndex()) {
// It's possible a follower was lagging and an install snapshot advanced its match index past
// the current replicatedToAllIndex. Since the follower is now caught up we should advance the
// replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
// It's possible a follower was lagging and an install snapshot advanced its match index past
// the current replicatedToAllIndex. Since the follower is now caught up we should advance the
// replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
- if(captureSnapshot.isInstallSnapshotInitiated()) {
- LOG.info("{}: Initiating snapshot capture {} to install on {}",
+ if (captureSnapshot.isInstallSnapshotInitiated()) {
+ log.info("{}: Initiating snapshot capture {} to install on {}",
persistenceId(), captureSnapshot, targetFollower);
} else {
persistenceId(), captureSnapshot, targetFollower);
} else {
- LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+ log.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
}
lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
}
lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
- LOG.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
+ log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
SnapshotManager.this.currentState = CREATING;
SnapshotManager.this.currentState = CREATING;
createSnapshotProcedure.run();
} catch (Exception e) {
SnapshotManager.this.currentState = IDLE;
createSnapshotProcedure.run();
} catch (Exception e) {
SnapshotManager.this.currentState = IDLE;
- LOG.error("Error creating snapshot", e);
+ log.error("Error creating snapshot", e);
- public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
+ String targetFollower) {
return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
}
@Override
return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
}
@Override
- public void apply(ApplySnapshot applySnapshot) {
- SnapshotManager.this.applySnapshot = applySnapshot;
+ public void apply(ApplySnapshot toApply) {
+ SnapshotManager.this.applySnapshot = toApply;
lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
- LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
+ log.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
- context.getPersistenceProvider().saveSnapshot(applySnapshot.getSnapshot());
+ context.getPersistenceProvider().saveSnapshot(toApply.getSnapshot());
SnapshotManager.this.currentState = PERSISTING;
}
SnapshotManager.this.currentState = PERSISTING;
}
context.getPersistenceProvider().saveSnapshot(snapshot);
context.getPersistenceProvider().saveSnapshot(snapshot);
- LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
+ log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
- long dataThreshold = totalMemory *
- context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+ long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold;
boolean logSizeExceededSnapshotBatchCount =
boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold;
boolean logSizeExceededSnapshotBatchCount =
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
- if(LOG.isDebugEnabled()) {
- if(dataSizeThresholdExceeded) {
- LOG.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}",
+ if (log.isDebugEnabled()) {
+ if (dataSizeThresholdExceeded) {
+ log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}",
context.getId(), context.getReplicatedLog().dataSize(), dataThreshold,
captureSnapshot.getLastAppliedIndex());
} else {
context.getId(), context.getReplicatedLog().dataSize(), dataThreshold,
captureSnapshot.getLastAppliedIndex());
} else {
- LOG.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}",
+ log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}",
context.getId(), context.getReplicatedLog().size(),
context.getId(), context.getReplicatedLog().size(),
- context.getConfigParams().getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex());
+ context.getConfigParams().getSnapshotBatchCount(),
+ captureSnapshot.getLastAppliedIndex());
// Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
// install snapshot to a follower.
// Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
// install snapshot to a follower.
- if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+ if (captureSnapshot.getReplicatedToAllIndex() >= 0) {
currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
}
currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
}
- } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+ } else if (captureSnapshot.getReplicatedToAllIndex() != -1) {
// clear the log based on replicatedToAllIndex
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
captureSnapshot.getReplicatedToAllTerm());
// clear the log based on replicatedToAllIndex
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
captureSnapshot.getReplicatedToAllTerm());
context.getReplicatedLog().getSnapshotTerm());
}
context.getReplicatedLog().getSnapshotTerm());
}
- LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
- "and term: {}", context.getId(), context.getReplicatedLog().getSnapshotIndex(),
+ log.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} and term: {}",
+ context.getId(), context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm());
if (context.getId().equals(currentBehavior.getLeaderId())
context.getReplicatedLog().getSnapshotTerm());
if (context.getId().equals(currentBehavior.getLeaderId())
@Override
public void commit(final long sequenceNumber, long timeStamp) {
@Override
public void commit(final long sequenceNumber, long timeStamp) {
- LOG.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber);
+ log.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber);
- if(applySnapshot != null) {
+ if (applySnapshot != null) {
try {
Snapshot snapshot = applySnapshot.getSnapshot();
try {
Snapshot snapshot = applySnapshot.getSnapshot();
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
- if(snapshot.getServerConfiguration() != null) {
+ if (snapshot.getServerConfiguration() != null) {
context.updatePeerIds(snapshot.getServerConfiguration());
}
context.updatePeerIds(snapshot.getServerConfiguration());
}
- if(snapshot.getState().length > 0 ) {
+ if (snapshot.getState().length > 0 ) {
applySnapshotProcedure.accept(snapshot.getState());
}
applySnapshot.getCallback().onSuccess();
} catch (Exception e) {
applySnapshotProcedure.accept(snapshot.getState());
}
applySnapshot.getCallback().onSuccess();
} catch (Exception e) {
- LOG.error("{}: Error applying snapshot", context.getId(), e);
+ log.error("{}: Error applying snapshot", context.getId(), e);
}
} else {
context.getReplicatedLog().snapshotCommit();
}
} else {
context.getReplicatedLog().snapshotCommit();
@Override
public void rollback() {
// Nothing to rollback if we're applying a snapshot from the leader.
@Override
public void rollback() {
// Nothing to rollback if we're applying a snapshot from the leader.
- if(applySnapshot == null) {
+ if (applySnapshot == null) {
context.getReplicatedLog().snapshotRollback();
context.getReplicatedLog().snapshotRollback();
- LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+ log.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle."
+ + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
private interface TermInformationReader {
long getIndex();
private interface TermInformationReader {
long getIndex();
- static class LastAppliedTermInformationReader implements TermInformationReader{
+ static class LastAppliedTermInformationReader implements TermInformationReader {
private long index;
private long term;
private long index;
private long term;
- public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
- ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
+ LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex, ReplicatedLogEntry lastLogEntry,
+ boolean hasFollowers) {
ReplicatedLogEntry entry = log.get(originalIndex);
this.index = -1L;
this.term = -1L;
if (!hasFollowers) {
ReplicatedLogEntry entry = log.get(originalIndex);
this.index = -1L;
this.term = -1L;
if (!hasFollowers) {
- if(lastLogEntry != null) {
+ if (lastLogEntry != null) {
// since we have persisted the last-log-entry to persistent journal before the capture,
// we would want to snapshot from this entry.
index = lastLogEntry.getIndex();
// since we have persisted the last-log-entry to persistent journal before the capture,
// we would want to snapshot from this entry.
index = lastLogEntry.getIndex();
} else if (entry != null) {
index = entry.getIndex();
term = entry.getTerm();
} else if (entry != null) {
index = entry.getIndex();
term = entry.getTerm();
- } else if(log.getSnapshotIndex() > -1){
+ } else if (log.getSnapshotIndex() > -1) {
index = log.getSnapshotIndex();
term = log.getSnapshotTerm();
}
index = log.getSnapshotIndex();
term = log.getSnapshotTerm();
}
- public long getIndex(){
+ public long getIndex() {
return this.index;
}
@Override
return this.index;
}
@Override
+ public long getTerm() {
- private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
+ private static class ReplicatedToAllTermInformationReader implements TermInformationReader {
private long index;
private long term;
private long index;
private long term;
- ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
+ ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex) {
ReplicatedLogEntry entry = log.get(originalIndex);
this.index = -1L;
this.term = -1L;
ReplicatedLogEntry entry = log.get(originalIndex);
this.index = -1L;
this.term = -1L;
- public long getIndex(){
+ public long getIndex() {
return this.index;
}
@Override
return this.index;
}
@Override
+ public long getTerm() {
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+/**
+ * Interface for a snapshot phase state.
+ *
+ * @author Moiz Raja
+ * @author Thomas Pantelis
+ */
public interface SnapshotState {
/**
* @return true when a snapshot is being captured
public interface SnapshotState {
/**
* @return true when a snapshot is being captured
/**
* An abstract class that implements a Runnable operation with a timer such that if the run method isn't
* invoked within a timeout period, the operation is cancelled via {@link #doCancel}.
/**
* An abstract class that implements a Runnable operation with a timer such that if the run method isn't
* invoked within a timeout period, the operation is cancelled via {@link #doCancel}.
* <p>
* <b>Note:</b> this class is not thread safe and is intended for use only within the context of the same
* actor that's passed on construction. The run method must be called on this actor's thread dispatcher as it
* <p>
* <b>Note:</b> this class is not thread safe and is intended for use only within the context of the same
* actor that's passed on construction. The run method must be called on this actor's thread dispatcher as it
TimedRunnable(FiniteDuration timeout, RaftActor actor) {
Preconditions.checkNotNull(timeout);
Preconditions.checkNotNull(actor);
TimedRunnable(FiniteDuration timeout, RaftActor actor) {
Preconditions.checkNotNull(timeout);
Preconditions.checkNotNull(actor);
- cancelTimer = actor.getContext().system().scheduler().scheduleOnce(timeout, actor.self(), new Runnable() {
- @Override
- public void run() {
- cancel();
- }
- }, actor.getContext().system().dispatcher(), actor.self());
+ cancelTimer = actor.getContext().system().scheduler().scheduleOnce(timeout, actor.self(),
+ (Runnable) () -> cancel(), actor.getContext().system().dispatcher(), actor.self());
}
@Override
public void run() {
}
@Override
public void run() {
canRun = false;
cancelTimer.cancel();
doRun();
canRun = false;
cancelTimer.cancel();
doRun();