} else if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
- long elapsedTime = (System.nanoTime() - applyState.getStartTime());
- if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
- LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
- TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
- }
+ long startTime = System.nanoTime();
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Applying state for log index {} data {}",
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
+ long elapsedTime = System.nanoTime() - startTime;
+ if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
+ LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+ TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+ }
+
if (!hasFollowers()) {
// for single node, the capture should happen after the apply state
// as we delete messages from the persistent journal which have made it to the snapshot
}
});
} else if(currentBehavior.state() == RaftState.Leader) {
- pauseLeader(new Runnable() {
+ pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
@Override
- public void run() {
+ protected void doRun() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+
+ @Override
+ protected void doCancel() {
self().tell(PoisonPill.getInstance(), self());
}
});
}
protected boolean isLeaderActive() {
- return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null;
+ return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
+ !isLeadershipTransferInProgress();
+ }
+
+ private boolean isLeadershipTransferInProgress() {
+ return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
}
/**
/**
* This method is called prior to operations such as leadership transfer and actor shutdown when the leader
* must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
- * work prior to performing the operation. On completion of any work, the run method must be called to
- * proceed with the given operation.
+ * work prior to performing the operation. On completion of any work, the run method must be called on the
+ * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
+ * this actor's thread dispatcher as as it modifies internal state.
* <p>
* The default implementation immediately runs the operation.
*