.setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
.build());
manager = QueuedNotificationManager.create(syncThreadPool, (key, entries) -> {
- LOG.trace("Executing job with key: {}", key);
+ LOG.trace("Executing jobs with key: {}", key);
entries.forEach(jobEntry -> new MainTask<>(jobEntry).run());
+ LOG.trace("Finished executing jobs with key: {}", key);
}, 4096, "nc-jobqueue");
}
LOG.error("Direct Exception (not failed Future) when executing job, won't even retry: {}", jobEntry, e);
}
- if (future == null) {
+ if (future != null) {
+ Futures.addCallback(future, new FutureCallback<T>() {
+ @Override
+ public void onSuccess(final T result) {
+ LOG.trace("Job completed successfully: {}", jobEntry.getKey());
+ jobEntry.setResultFuture(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.error("Job {} failed", jobEntry.getKey(), cause);
+ }
+ }, MoreExecutors.directExecutor());
+ } else {
jobEntry.setResultFuture(null);
- return;
}
- Futures.addCallback(future, new FutureCallback<T>() {
- @Override
- public void onSuccess(final T result) {
- LOG.trace("Job completed successfully: {}", jobEntry.getKey());
- jobEntry.setResultFuture(result);
- }
- @Override
- public void onFailure(final Throwable cause) {
- LOG.error("Job {} failed", jobEntry.getKey(), cause);
- }
- }, MoreExecutors.directExecutor());
+ LOG.trace("Finished running job with key: {}", jobEntry.getKey());
}
}
}