return false;
}
- Procedure<ReplicatedLogEntry> persistCallback = persistedLogEntry -> {
- context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
-
- dataSizeSinceLastSnapshot += persistedLogEntry.size();
-
- if (callback != null) {
- callback.apply(persistedLogEntry);
- }
- };
-
if (doAsync) {
- context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback);
+ context.getPersistenceProvider().persistAsync(replicatedLogEntry,
+ entry -> persistCallback(entry, callback));
} else {
- context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback);
+ context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback));
}
return true;
}
+
+ private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
+ final Procedure<ReplicatedLogEntry> callback) {
+ context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
+ }
+
+ @SuppressWarnings("checkstyle:illegalCatch")
+ private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
+ final Procedure<ReplicatedLogEntry> callback) {
+ context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+ dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+ if (callback != null) {
+ try {
+ callback.apply(persistedLogEntry);
+ } catch (Exception e) {
+ context.getLogger().error("{}: persist callback failed", context.getId(), e);
+ throw new IllegalStateException("Persist callback failed", e);
+ }
+ }
+ }
}