* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
+import akka.actor.Status;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
private RaftActorServerConfigurationSupport serverConfigurationSupport;
- private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
private boolean shuttingDown;
- protected RaftActor(String id, Map<String, String> peerAddresses,
- Optional<ConfigParams> configParams, short payloadVersion) {
+ protected RaftActor(final String id, final Map<String, String> peerAddresses,
+ final Optional<ConfigParams> configParams, final short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
}
@Override
- protected void handleRecover(Object message) {
+ protected void handleRecover(final Object message) {
if (raftRecovery == null) {
raftRecovery = newRaftActorRecoverySupport();
}
@VisibleForTesting
@SuppressWarnings("checkstyle:IllegalCatch")
- protected void changeCurrentBehavior(RaftActorBehavior newBehavior) {
+ protected void changeCurrentBehavior(final RaftActorBehavior newBehavior) {
final RaftActorBehavior currentBehavior = getCurrentBehavior();
if (currentBehavior != null) {
try {
} else if (message instanceof Runnable) {
((Runnable)message).run();
} else if (message instanceof NoopPayload) {
- persistData(null, null, (NoopPayload)message, false);
+ persistData(null, null, (NoopPayload) message, false);
+ } else if (message instanceof RequestLeadership) {
+ onRequestLeadership((RequestLeadership) message);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
}
}
+ private void onRequestLeadership(final RequestLeadership message) {
+ LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+ if (!isLeader()) {
+ // non-leader cannot satisfy leadership request
+ LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+ + " Current behavior: {}. Sending failure response",
+ persistenceId(), message, getCurrentBehavior().state());
+ message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+ + message.getRequestedFollowerId()
+ + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+ return;
+ }
+
+ final String requestedFollowerId = message.getRequestedFollowerId();
+ final ActorRef replyTo = message.getReplyTo();
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(final ActorRef raftActorRef) {
+ // sanity check
+ if (!requestedFollowerId.equals(getLeaderId())) {
+ onFailure(raftActorRef);
+ }
+
+ LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Success(null), getSelf());
+ }
+
+ @Override
+ public void onFailure(final ActorRef raftActorRef) {
+ LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Failure(
+ new LeadershipTransferFailedException(
+ "Failed to transfer leadership to " + requestedFollowerId
+ + ". Follower is not ready to become leader")),
+ getSelf());
+ }
+ }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
+ }
+
private boolean possiblyHandleBehaviorMessage(final Object message) {
final RaftActorBehavior currentBehavior = getCurrentBehavior();
final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
return false;
}
- private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+ private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+ final @Nullable String followerId, final long newLeaderTimeoutInMillis) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
if (leadershipTransferInProgress == null) {
- leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+ leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef) {
- leadershipTransferInProgress = null;
+ public void onSuccess(final ActorRef raftActorRef) {
+ context.setRaftActorLeadershipTransferCohort(null);
}
@Override
- public void onFailure(ActorRef raftActorRef) {
- leadershipTransferInProgress = null;
+ public void onFailure(final ActorRef raftActorRef) {
+ context.setRaftActorLeadershipTransferCohort(null);
}
});
leadershipTransferInProgress.addOnComplete(onComplete);
+
+ context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
leadershipTransferInProgress.init();
+
} else {
LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
leadershipTransferInProgress.addOnComplete(onComplete);
shuttingDown = true;
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- if (currentBehavior.state() != RaftState.Leader) {
- // For non-leaders shutdown is a no-op
- self().tell(PoisonPill.getInstance(), self());
- return;
+ switch (currentBehavior.state()) {
+ case Leader:
+ case PreLeader:
+ // Fall-through to more work
+ break;
+ default:
+ // For non-leaders shutdown is a no-op
+ self().tell(PoisonPill.getInstance(), self());
+ return;
}
if (context.hasFollowers()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef) {
+ public void onSuccess(final ActorRef raftActorRef) {
LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
@Override
- public void onFailure(ActorRef raftActorRef) {
+ public void onFailure(final ActorRef raftActorRef) {
LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
- });
+ }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
} else {
pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
@Override
}
}
- private void switchBehavior(SwitchBehavior message) {
+ private void switchBehavior(final SwitchBehavior message) {
if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
RaftState newState = message.getNewState();
- if ( newState == RaftState.Leader || newState == RaftState.Follower) {
+ if (newState == RaftState.Leader || newState == RaftState.Follower) {
+ getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
- getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
} else {
LOG.warn("Switching to behavior : {} - not supported", newState);
}
}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+ OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog().dataSize())
for (String id: followerIds) {
final FollowerLogInformation info = leader.getFollower(id);
followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
- info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
+ info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
+ TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
context.getPeerInfo(info.getId()).isVoting()));
}
}
- private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
+ protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
+ return OnDemandRaftState.builder();
+ }
+
+ private void handleBehaviorChange(final BehaviorState oldBehaviorState, final RaftActorBehavior currentBehavior) {
RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
if (oldBehavior != currentBehavior) {
onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+ context.getRaftActorLeadershipTransferCohort();
if (leadershipTransferInProgress != null) {
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
}
}
- private void handleApplyState(ApplyState applyState) {
+ private void handleApplyState(final ApplyState applyState) {
long startTime = System.nanoTime();
Payload payload = applyState.getReplicatedLogEntry().getData();
self().tell(applyState, self());
}
- protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
+ protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+ final short leaderPayloadVersion) {
return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
}
@VisibleForTesting
- void setCurrentBehavior(RaftActorBehavior behavior) {
+ void setCurrentBehavior(final RaftActorBehavior behavior) {
context.setCurrentBehavior(behavior);
}
&& !shuttingDown && !isLeadershipTransferInProgress();
}
- private boolean isLeadershipTransferInProgress() {
+ protected boolean isLeadershipTransferInProgress() {
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
}
return context;
}
- protected void updateConfigParams(ConfigParams configParams) {
+ protected void updateConfigParams(final ConfigParams configParams) {
// obtain the RaftPolicy for oldConfigParams and the updated one.
String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
return delegatingPersistenceProvider.getDelegate();
}
- public void setPersistence(DataPersistenceProvider provider) {
+ public void setPersistence(final DataPersistenceProvider provider) {
delegatingPersistenceProvider.setDelegate(provider);
}
- protected void setPersistence(boolean persistent) {
+ protected void setPersistence(final boolean persistent) {
DataPersistenceProvider currentPersistence = persistence();
if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
setPersistence(new PersistentDataProvider(this));
captureSnapshot();
}
} else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
- setPersistence(new NonPersistentDataProvider() {
- /**
+ setPersistence(new NonPersistentDataProvider(this) {
+ /*
* The way snapshotting works is,
* <ol>
* <li> RaftActor calls createSnapshot on the Shard
* </ol>
*/
@Override
- public void saveSnapshot(Object object) {
+ public void saveSnapshot(final Object object) {
// Make saving Snapshot successful
// Committing the snapshot here would end up calling commit in the creating state which would
// be a state violation. That's why now we send a message to commit the snapshot.
* Note that if the peerId does not match the list of peers passed to
* this actor during construction an IllegalStateException will be thrown.
*/
- protected void setPeerAddress(String peerId, String peerAddress) {
+ protected void setPeerAddress(final String peerId, final String peerAddress) {
context.setPeerAddress(peerId, peerAddress);
}
/**
* Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
- @Nonnull
- protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
+ protected abstract @NonNull RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
* This method is called when recovery is complete.
protected abstract void onRecoveryComplete();
/**
- * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
+ * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
*/
- @Nonnull
- protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
+ protected abstract @NonNull RaftActorSnapshotCohort getRaftActorSnapshotCohort();
/**
* This method will be called by the RaftActor when the state of the
*/
protected abstract Optional<ActorRef> getRoleChangeNotifier();
+ /**
+ * This method is called on the leader when a voting change operation completes.
+ */
+ protected void onVotingStateChangeComplete() {
+ }
+
/**
* This method is called prior to operations such as leadership transfer and actor shutdown when the leader
* must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
*
* @param operation the operation to run
*/
- protected void pauseLeader(Runnable operation) {
+ protected void pauseLeader(final Runnable operation) {
operation.run();
}
- protected void onLeaderChanged(String oldLeader, String newLeader) {
+ /**
+ * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
+ * should resume normal operations.
+ *
+ * <p>
+ * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
+ */
+ protected void unpauseLeader() {
+
+ }
+
+ protected void onLeaderChanged(final String oldLeader, final String newLeader) {
}
private String getLeaderAddress() {
if (isLeader()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef) {
+ public void onSuccess(final ActorRef raftActorRef) {
LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
ensureFollowerState();
}
@Override
- public void onFailure(ActorRef raftActorRef) {
+ public void onFailure(final ActorRef raftActorRef) {
LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
ensureFollowerState();
}
initializeBehavior();
}
}
- });
+ }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
}
@Nullable abstract String getLastLeaderId();
- @Nullable abstract short getLeaderPayloadVersion();
+ abstract short getLeaderPayloadVersion();
}
/**
final RaftActorBehavior behavior) {
this.lastValidLeaderId = lastValidLeaderId;
this.lastLeaderId = lastLeaderId;
- this.behavior = Preconditions.checkNotNull(behavior);
+ this.behavior = requireNonNull(behavior);
this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
}
BehaviorState capture(final RaftActorBehavior behavior) {
if (behavior == null) {
- Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
+ verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
return NULL_BEHAVIOR_STATE;
}
return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
}
}
-
}