- // When persisting events with persist it is guaranteed that the
- // persistent actor will not receive further commands between the
- // persist call and the execution(s) of the associated event
- // handler. This also holds for multiple persist calls in context
- // of a single command.
- context.getPersistenceProvider().persist(replicatedLogEntry,
- param -> {
- context.getLogger().debug("{}: persist complete {}", context.getId(), param);
-
- int logEntrySize = param.size();
- dataSizeSinceLastSnapshot += logEntrySize;
-
- if (callback != null) {
- callback.apply(param);
- }
+ if (doAsync) {
+ context.getPersistenceProvider().persistAsync(replicatedLogEntry,
+ entry -> persistCallback(entry, callback));
+ } else {
+ context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback));
+ }
+
+ return true;
+ }
+
+ private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
+ final Procedure<ReplicatedLogEntry> callback) {
+ context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
+ }
+
+ @SuppressWarnings("checkstyle:illegalCatch")
+ private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
+ final Procedure<ReplicatedLogEntry> callback) {
+ context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+ dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+ if (callback != null) {
+ try {
+ callback.apply(persistedLogEntry);
+ } catch (Exception e) {
+ context.getLogger().error("{}: persist callback failed", context.getId(), e);
+ throw new IllegalStateException("Persist callback failed", e);