}
@Override
- public <T> ListenableFuture<T> enqueueJob(String key, Callable<ListenableFuture<T>> mainWorker) {
+ public <T> ListenableFuture<T> enqueueJob(final String key, final Callable<ListenableFuture<T>> mainWorker) {
- JobEntry jobEntry = new JobEntry(key, mainWorker);
+ JobEntry<T> jobEntry = new JobEntry<>(key, mainWorker);
JobQueue jobQueue = jobQueueMap.computeIfAbsent(key, mapKey -> new JobQueue());
jobQueue.addEntry(jobEntry);
signalForNextJob();
if (jobQueue.getExecutingEntry() != null) {
continue;
}
- JobEntry jobEntry = jobQueue.poll();
+ JobEntry<?> jobEntry = jobQueue.poll();
if (jobEntry == null) {
// job queue is empty. so continue with next job queue entry
continue;
}
jobQueue.setExecutingEntry(jobEntry);
- MainTask worker = new MainTask(jobEntry);
+ MainTask<?> worker = new MainTask<>(jobEntry);
LOG.trace("Executing job with key: {}", jobEntry.getKey());
executeTask(worker) ;
}
}
private class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
- private final JobEntry jobEntry;
+ private final JobEntry<T> jobEntry;
- MainTask(JobEntry jobEntry) {
+ MainTask(final JobEntry<T> jobEntry) {
super(LOG, jobEntry::toString);
this.jobEntry = jobEntry;
}
return;
}
clearJob(jobEntry);
- Futures.addCallback(future, new JobCallback(jobEntry), MoreExecutors.directExecutor());
-
- }
-
- private class JobCallback<T> implements FutureCallback<T> {
- private final JobEntry jobEntry;
-
- JobCallback(JobEntry jobEntry) {
- this.jobEntry = jobEntry;
- }
-
- /**
- * This implies that all the future instances have returned success. --
- * TODO: Confirm this
- */
- @Override
- public void onSuccess(T result) {
- LOG.trace("Job completed successfully: {}", jobEntry.getKey());
- jobEntry.setResultFuture(result);
- clearJob(jobEntry);
- }
+ Futures.addCallback(future, new FutureCallback<T>() {
+ @Override
+ public void onSuccess(final T result) {
+ LOG.trace("Job completed successfully: {}", jobEntry.getKey());
+ jobEntry.setResultFuture(result);
+ clearJob(jobEntry);
+ }
- @Override
- public void onFailure(Throwable throwable) {
- clearJob(jobEntry);
- }
+ @Override
+ public void onFailure(final Throwable cause) {
+ clearJob(jobEntry);
+ }
+ }, MoreExecutors.directExecutor());
}
}
- private void clearJob(JobEntry jobEntry) {
+ private void clearJob(final JobEntry<?> jobEntry) {
String jobKey = jobEntry.getKey();
LOG.trace("About to clear jobKey: {}", jobKey);
JobQueue jobQueue = jobQueueMap.get(jobKey);
}
}
- private void executeTask(Runnable task) {
+ private void executeTask(final Runnable task) {
try {
syncThreadPool.submit(task);
} catch (RejectedExecutionException e) {