+ private void handleApplyState(final ApplyState applyState) {
+ long startTime = System.nanoTime();
+
+ Payload payload = applyState.getReplicatedLogEntry().getData();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Applying state for log index {} data {}",
+ persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+ }
+
+ if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
+ applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
+ }
+
+ long elapsedTime = System.nanoTime() - startTime;
+ if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
+ LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+ TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+ }
+
+ // Send the ApplyState message back to self to handle further processing asynchronously.
+ self().tell(applyState, self());
+ }
+
+ protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+ final short leaderPayloadVersion) {
+ return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
+ }
+
+ @Override
+ public long snapshotSequenceNr() {
+ // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
+ // so that we can delete the persistent journal based on the saved sequence-number
+ // However , when akka replays the journal during recovery, it replays it from the sequence number when the
+ // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the
+ // last-sequence number known to us.
+ return context.getSnapshotManager().getLastSequenceNumber();
+ }
+