* @param index the log index
*/
protected void applyLogToStateMachine(final long index) {
- long newLastApplied = context.getLastApplied();
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- final ApplyState msg;
+ final ApplyState applyState;
final ClientRequestTracker tracker = removeClientRequestTracker(i);
if (tracker != null) {
- msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
+ applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
} else {
- msg = new ApplyState(null, null, replicatedLogEntry);
+ applyState = new ApplyState(null, null, replicatedLogEntry);
}
- actor().tell(msg, actor());
- newLastApplied = i;
+ log.debug("{}: Setting last applied to {}", logName(), i);
+
+ context.setLastApplied(i);
+ context.getApplyStateConsumer().accept(applyState);
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
}
}
- log.debug("{}: Setting last applied to {}", logName(), newLastApplied);
-
- context.setLastApplied(newLastApplied);
-
// send a message to persist a ApplyLogEntries marker message into akka's persistent journal
// will be used during recovery
//in case if the above code throws an error and this message is not sent, it would be fine