import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
}
@Override
- public void handleRecover(Object message) {
+ protected void handleRecover(Object message) {
if(raftRecovery == null) {
raftRecovery = newRaftActorRecoverySupport();
}
}
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
+ if(getCurrentBehavior() != null) {
+ try {
+ getCurrentBehavior().close();
+ } catch(Exception e) {
+ LOG.warn("{}: Error closing behavior {}", persistence(), getCurrentBehavior(), e);
+ }
+ }
+
reusableBehaviorStateHolder.init(getCurrentBehavior());
setCurrentBehavior(newBehavior);
handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
}
@Override
- public void handleCommand(final Object message) {
- if(serverConfigurationSupport.handleMessage(message, getSender())) {
+ protected void handleCommand(final Object message) {
+ if (serverConfigurationSupport.handleMessage(message, getSender())) {
return;
- } else if (message instanceof ApplyState){
+ }
+ if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
+ return;
+ }
+
+ if (message instanceof ApplyState) {
ApplyState applyState = (ApplyState) message;
long elapsedTime = (System.nanoTime() - applyState.getStartTime());
context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
}
- } else if (message instanceof ApplyJournalEntries){
+ } else if (message instanceof ApplyJournalEntries) {
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
+ LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
}
persistence().persist(applyEntries, NoopProcedure.instance());
onGetOnDemandRaftStats();
} else if(message instanceof InitiateCaptureSnapshot) {
captureSnapshot();
- } else if(message instanceof SwitchBehavior){
+ } else if(message instanceof SwitchBehavior) {
switchBehavior(((SwitchBehavior) message));
} else if(message instanceof LeaderTransitioning) {
onLeaderTransitioning();
onShutDown();
} else if(message instanceof Runnable) {
((Runnable)message).run();
- } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
+ } else {
switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
}
}
- private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+ void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
if(leadershipTransferInProgress == null) {
}
});
} else if(currentBehavior.state() == RaftState.Leader) {
- pauseLeader(new Runnable() {
+ pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
+ @Override
+ protected void doRun() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+
@Override
- public void run() {
+ protected void doCancel() {
self().tell(PoisonPill.getInstance(), self());
}
});
if(leadershipTransferInProgress != null) {
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
+
+ serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
}
if (roleChangeNotifier.isPresent() &&
}
protected boolean isLeaderActive() {
- return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null;
+ return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
+ !isLeadershipTransferInProgress();
+ }
+
+ private boolean isLeadershipTransferInProgress() {
+ return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
}
/**
RaftActorBehavior behavior = currentBehavior.getDelegate();
if(behavior instanceof Follower) {
String previousLeaderId = ((Follower)behavior).getLeaderId();
+ short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
- changeCurrentBehavior(new Follower(context, previousLeaderId));
+ changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
} else {
initializeBehavior();
}
/**
* This method is called prior to operations such as leadership transfer and actor shutdown when the leader
* must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
- * work prior to performing the operation. On completion of any work, the run method must be called to
- * proceed with the given operation.
+ * work prior to performing the operation. On completion of any work, the run method must be called on the
+ * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
+ * this actor's thread dispatcher as as it modifies internal state.
* <p>
* The default implementation immediately runs the operation.
*
@Override
public RaftActorBehavior get() {
if(this.message instanceof SwitchBehavior){
- return ((SwitchBehavior) message).getNewState().createBehavior(getRaftActorContext());
+ return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState());
}
return currentBehavior.handleMessage(sender, message);
}