import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
- protected final Logger LOG = LoggerFactory.getLogger(getClass());
-
/**
* This context should NOT be passed directly to any other actor it is
- * only to be consumed by the RaftActorBehaviors
+ * only to be consumed by the RaftActorBehaviors.
*/
private final RaftActorContextImpl context;
private boolean shuttingDown;
- public RaftActor(String id, Map<String, String> peerAddresses,
+ protected RaftActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams, short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
- configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(),
+ configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, LOG);
context.setPayloadVersion(payloadVersion);
@Override
protected void handleRecover(Object message) {
- if(raftRecovery == null) {
+ if (raftRecovery == null) {
raftRecovery = newRaftActorRecoverySupport();
}
boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
- if(recoveryComplete) {
+ if (recoveryComplete) {
onRecoveryComplete();
initializeBehavior();
}
@VisibleForTesting
- void initializeBehavior(){
+ void initializeBehavior() {
changeCurrentBehavior(new Follower(context));
}
long startTime = System.nanoTime();
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("{}: Applying state for log index {} data {}",
persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
applyState.getReplicatedLogEntry().getData());
}
long elapsedTime = System.nanoTime() - startTime;
- if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
+ if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
}
} else if (message instanceof ApplyJournalEntries) {
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
- }
+ LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
persistence().persist(applyEntries, NoopProcedure.instance());
new FindLeaderReply(getLeaderAddress()),
getSelf()
);
- } else if(message instanceof GetOnDemandRaftState) {
+ } else if (message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
- } else if(message instanceof InitiateCaptureSnapshot) {
+ } else if (message instanceof InitiateCaptureSnapshot) {
captureSnapshot();
- } else if(message instanceof SwitchBehavior) {
+ } else if (message instanceof SwitchBehavior) {
switchBehavior((SwitchBehavior) message);
- } else if(message instanceof LeaderTransitioning) {
+ } else if (message instanceof LeaderTransitioning) {
onLeaderTransitioning();
- } else if(message instanceof Shutdown) {
+ } else if (message instanceof Shutdown) {
onShutDown();
- } else if(message instanceof Runnable) {
+ } else if (message instanceof Runnable) {
((Runnable)message).run();
- } else if(message instanceof NoopPayload) {
+ } else if (message instanceof NoopPayload) {
persistData(null, null, (NoopPayload)message);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
- if(leadershipTransferInProgress == null) {
+ if (leadershipTransferInProgress == null) {
leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
private void onShutDown() {
LOG.debug("{}: onShutDown", persistenceId());
- if(shuttingDown) {
+ if (shuttingDown) {
return;
}
private void onLeaderTransitioning() {
LOG.debug("{}: onLeaderTransitioning", persistenceId());
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
- if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+ if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
}
}
private void switchBehavior(SwitchBehavior message) {
- if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
+ if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
RaftState newState = message.getNewState();
- if( newState == RaftState.Leader || newState == RaftState.Follower) {
+ if ( newState == RaftState.Leader || newState == RaftState.Follower) {
switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
Map<String, String> peerAddresses = new HashMap<>();
Map<String, Boolean> peerVotingStates = new HashMap<>();
- for(PeerInfo info: context.getPeers()) {
+ for (PeerInfo info: context.getPeers()) {
peerVotingStates.put(info.getId(), info.isVoting());
peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
}
builder.lastLogTerm(lastLogEntry.getTerm());
}
- if(getCurrentBehavior() instanceof AbstractLeader) {
+ if (getCurrentBehavior() instanceof AbstractLeader) {
AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
Collection<String> followerIds = leader.getFollowerIds();
List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
- for(String id: followerIds) {
+ for (String id: followerIds) {
final FollowerLogInformation info = leader.getFollower(id);
followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
// it can happen that the state has not changed but the leader has changed.
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
- if(!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) ||
- oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
- if(roleChangeNotifier.isPresent()) {
+ if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
+ || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
+ if (roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
currentBehavior.getLeaderPayloadVersion()), getSelf());
}
onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
- if(leadershipTransferInProgress != null) {
+ if (leadershipTransferInProgress != null) {
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
}
- if (roleChangeNotifier.isPresent() &&
- (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
+ if (roleChangeNotifier.isPresent()
+ && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
currentBehavior.state().name()), getSelf());
}
public long snapshotSequenceNr() {
// When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
// so that we can delete the persistent journal based on the saved sequence-number
- // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot
- // was saved and not the number we saved.
- // We would want to override it , by asking akka to use the last-sequence number known to us.
+ // However , when akka replays the journal during recovery, it replays it from the sequence number when the
+ // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the
+ // last-sequence number known to us.
return context.getSnapshotManager().getLastSequenceNumber();
}
/**
* When a derived RaftActor needs to persist something it must call
* persistData.
- *
- * @param clientActor
- * @param identifier
- * @param data
*/
protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
- }
+ LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
final RaftActorContext raftContext = getRaftActorContext();
replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
- if (!hasFollowers()){
+ if (!hasFollowers()) {
// Increment the Commit Index and the Last Applied values
raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
raftContext.setLastApplied(replicatedLogEntry1.getIndex());
/**
* Derived actors can call the isLeader method to check if the current
- * RaftActor is the Leader or not
+ * RaftActor is the Leader or not.
*
* @return true it this RaftActor is a Leader false otherwise
*/
}
protected final boolean isLeaderActive() {
- return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader &&
- !shuttingDown && !isLeadershipTransferInProgress();
+ return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader
+ && !shuttingDown && !isLeadershipTransferInProgress();
}
private boolean isLeadershipTransferInProgress() {
*
* @return A reference to the leader if known, null otherwise
*/
- public ActorSelection getLeader(){
+ public ActorSelection getLeader() {
String leaderAddress = getLeaderAddress();
- if(leaderAddress == null){
+ if (leaderAddress == null) {
return null;
}
}
/**
+ * Returns the id of the current leader.
*
* @return the current leader's id
*/
- protected final String getLeaderId(){
+ protected final String getLeaderId() {
return getCurrentBehavior().getLeaderId();
}
return getCurrentBehavior().state();
}
- protected Long getCurrentTerm(){
+ protected Long getCurrentTerm() {
return context.getTermInformation().getCurrentTerm();
}
protected void updateConfigParams(ConfigParams configParams) {
// obtain the RaftPolicy for oldConfigParams and the updated one.
- String oldRaftPolicy = context.getConfigParams().
- getCustomRaftPolicyImplementationClass();
- String newRaftPolicy = configParams.
- getCustomRaftPolicyImplementationClass();
+ String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
+ String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass();
LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
oldRaftPolicy, newRaftPolicy);
String previousLeaderId = behavior.getLeaderId();
short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
- LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
+ LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(),
+ previousLeaderId);
changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
} else {
protected void setPersistence(boolean persistent) {
DataPersistenceProvider currentPersistence = persistence();
- if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
+ if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
setPersistence(new PersistentDataProvider(this));
- if(getCurrentBehavior() != null) {
+ if (getCurrentBehavior() != null) {
LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
captureSnapshot();
}
- } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
+ } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
setPersistence(new NonPersistentDataProvider() {
/**
* The way snapshotting works is,
* </ol>
*/
@Override
- public void saveSnapshot(Object o) {
+ public void saveSnapshot(Object object) {
// Make saving Snapshot successful
// Committing the snapshot here would end up calling commit in the creating state which would
// be a state violation. That's why now we send a message to commit the snapshot.
/**
* setPeerAddress sets the address of a known peer at a later time.
+ *
* <p>
* This is to account for situations where a we know that a peer
* exists but we do not know an address up-front. This may also be used in
* situations where a known peer starts off in a different location and we
* need to change it's address
+ *
* <p>
* Note that if the peerId does not match the list of peers passed to
* this actor during construction an IllegalStateException will be thrown.
- *
- * @param peerId
- * @param peerAddress
*/
- protected void setPeerAddress(String peerId, String peerAddress){
+ protected void setPeerAddress(String peerId, String peerAddress) {
context.setPeerAddress(peerId, peerAddress);
}
/**
* The applyState method will be called by the RaftActor when some data
- * needs to be applied to the actor's state
+ * needs to be applied to the actor's state.
*
* @param clientActor A reference to the client who sent this message. This
* is the same reference that was passed to persistData
protected abstract void onStateChanged();
/**
- * Notifier Actor for this RaftActor to notify when a role change happens
+ * Notifier Actor for this RaftActor to notify when a role change happens.
+ *
* @return ActorRef - ActorRef of the notifier or Optional.absent if none.
*/
protected abstract Optional<ActorRef> getRoleChangeNotifier();
* work prior to performing the operation. On completion of any work, the run method must be called on the
* given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
* this actor's thread dispatcher as as it modifies internal state.
+ *
* <p>
* The default implementation immediately runs the operation.
*
}
protected void onLeaderChanged(String oldLeader, String newLeader) {
-
}
- private String getLeaderAddress(){
- if(isLeader()){
+ private String getLeaderAddress() {
+ if (isLeader()) {
return getSelf().path().toString();
}
String leaderId = getLeaderId();
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
- persistenceId(), leaderId, peerAddress);
- }
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
return peerAddress;
}
- protected boolean hasFollowers(){
+ protected boolean hasFollowers() {
return getRaftActorContext().hasFollowers();
}
/**
* A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
*/
- private static abstract class BehaviorState implements Immutable {
+ private abstract static class BehaviorState implements Immutable {
@Nullable abstract RaftActorBehavior getBehavior();
+
@Nullable abstract String getLastValidLeaderId();
+
@Nullable abstract String getLastLeaderId();
+
@Nullable abstract short getLeaderPayloadVersion();
}