import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
+import akka.actor.Status;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
private RaftActorServerConfigurationSupport serverConfigurationSupport;
- private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
private boolean shuttingDown;
protected RaftActor(String id, Map<String, String> peerAddresses,
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
- delegatingPersistenceProvider, LOG);
+ delegatingPersistenceProvider, this::handleApplyState, LOG);
context.setPayloadVersion(payloadVersion);
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
return;
}
-
if (message instanceof ApplyState) {
ApplyState applyState = (ApplyState) message;
- long startTime = System.nanoTime();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Applying state for log index {} data {}",
- persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
- applyState.getReplicatedLogEntry().getData());
- }
-
- if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) {
- applyState(applyState.getClientActor(), applyState.getIdentifier(),
- applyState.getReplicatedLogEntry().getData());
- }
-
- long elapsedTime = System.nanoTime() - startTime;
- if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
- LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
- TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
- }
-
if (!hasFollowers()) {
// for single node, the capture should happen after the apply state
// as we delete messages from the persistent journal which have made it to the snapshot
context.getSnapshotManager().trimLog(context.getLastApplied());
}
- // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState.
possiblyHandleBehaviorMessage(message);
-
} else if (message instanceof ApplyJournalEntries) {
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
} else if (message instanceof Runnable) {
((Runnable)message).run();
} else if (message instanceof NoopPayload) {
- persistData(null, null, (NoopPayload)message, false);
+ persistData(null, null, (NoopPayload) message, false);
+ } else if (message instanceof RequestLeadership) {
+ onRequestLeadership((RequestLeadership) message);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
}
}
+ private void onRequestLeadership(final RequestLeadership message) {
+ LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+ if (!isLeader()) {
+ // non-leader cannot satisfy leadership request
+ LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+ + " Current behavior: {}. Sending failure response",
+ persistenceId(), getCurrentBehavior().state());
+ message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+ + message.getRequestedFollowerId()
+ + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+ return;
+ }
+
+ final String requestedFollowerId = message.getRequestedFollowerId();
+ final ActorRef replyTo = message.getReplyTo();
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(final ActorRef raftActorRef) {
+ // sanity check
+ if (!requestedFollowerId.equals(getLeaderId())) {
+ onFailure(raftActorRef);
+ }
+
+ LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Success(null), getSelf());
+ }
+
+ @Override
+ public void onFailure(final ActorRef raftActorRef) {
+ LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Failure(
+ new LeadershipTransferFailedException(
+ "Failed to transfer leadership to " + requestedFollowerId
+ + ". Follower is not ready to become leader")),
+ getSelf());
+ }
+ }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
+ }
+
private boolean possiblyHandleBehaviorMessage(final Object message) {
final RaftActorBehavior currentBehavior = getCurrentBehavior();
final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
return false;
}
- private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+ private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+ @Nullable final String followerId, long newLeaderTimeoutInMillis) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
if (leadershipTransferInProgress == null) {
- leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+ leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
public void onSuccess(ActorRef raftActorRef) {
- leadershipTransferInProgress = null;
+ context.setRaftActorLeadershipTransferCohort(null);
}
@Override
public void onFailure(ActorRef raftActorRef) {
- leadershipTransferInProgress = null;
+ context.setRaftActorLeadershipTransferCohort(null);
}
});
leadershipTransferInProgress.addOnComplete(onComplete);
+
+ context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
leadershipTransferInProgress.init();
+
} else {
LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
leadershipTransferInProgress.addOnComplete(onComplete);
shuttingDown = true;
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- if (currentBehavior.state() != RaftState.Leader) {
- // For non-leaders shutdown is a no-op
- self().tell(PoisonPill.getInstance(), self());
- return;
+ switch (currentBehavior.state()) {
+ case Leader:
+ case PreLeader:
+ // Fall-through to more work
+ break;
+ default:
+ // For non-leaders shutdown is a no-op
+ self().tell(PoisonPill.getInstance(), self());
+ return;
}
if (context.hasFollowers()) {
LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
- });
+ }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
} else {
pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
@Override
private void switchBehavior(SwitchBehavior message) {
if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
RaftState newState = message.getNewState();
- if ( newState == RaftState.Leader || newState == RaftState.Follower) {
+ if (newState == RaftState.Leader || newState == RaftState.Follower) {
switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+ OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog().dataSize())
}
+ protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ return OnDemandRaftState.builder();
+ }
+
private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+ context.getRaftActorLeadershipTransferCohort();
if (leadershipTransferInProgress != null) {
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
}
}
+ private void handleApplyState(ApplyState applyState) {
+ long startTime = System.nanoTime();
+
+ Payload payload = applyState.getReplicatedLogEntry().getData();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Applying state for log index {} data {}",
+ persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+ }
+
+ if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
+ applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
+ }
+
+ long elapsedTime = System.nanoTime() - startTime;
+ if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
+ LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+ TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+ }
+
+ // Send the ApplyState message back to self to handle further processing asynchronously.
+ self().tell(applyState, self());
+ }
+
protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
raftContext.setLastApplied(persistedLogEntry.getIndex());
// Apply the state immediately.
- self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
+ handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
// Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
&& !shuttingDown && !isLeadershipTransferInProgress();
}
- private boolean isLeadershipTransferInProgress() {
+ protected boolean isLeadershipTransferInProgress() {
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
}
protected abstract void onRecoveryComplete();
/**
- * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
+ * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
*/
@Nonnull
protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
operation.run();
}
+ /**
+ * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
+ * should resume normal operations.
+ *
+ * <p>
+ * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
+ */
+ protected void unpauseLeader() {
+
+ }
+
protected void onLeaderChanged(String oldLeader, String newLeader) {
}
initializeBehavior();
}
}
- });
+ }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
}