import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
new Thread(new JobQueueHandler()).start();
}
- public void enqueueJob(String key,
- Callable<List<ListenableFuture<Void>>> mainWorker) {
+ public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker) {
enqueueJob(key, mainWorker, null, 0);
}
- public void enqueueJob(String key,
- Callable<List<ListenableFuture<Void>>> mainWorker,
+ public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
RollbackCallable rollbackWorker) {
enqueueJob(key, mainWorker, rollbackWorker, 0);
}
- public void enqueueJob(String key,
- Callable<List<ListenableFuture<Void>>> mainWorker,
- int maxRetries) {
+ public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker, int maxRetries) {
enqueueJob(key, mainWorker, null, maxRetries);
}
}
/**
- * Enqueue a Job with an appropriate key.
- * A JobEntry is created and queued appropriately.
+ * This is used by the external applications to enqueue a Job
+ * with an appropriate key. A JobEntry is created and queued
+ * appropriately.
*/
public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
RollbackCallable rollbackWorker, int maxRetries) {
LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
jobQueue.addEntry(jobEntry);
jobEntriesMap.put(key, jobQueue);
+
+ DataStoreJobCoordinatorCounters.jobs_pending.inc();
}
reentrantLock.lock();
try {
jobEntriesMap.remove(jobEntry.getKey());
}
}
+ DataStoreJobCoordinatorCounters.jobs_cleared.inc();
}
/**
- * Generate the hashkey for the jobQueueMap.
+ * Used to generate the hashkey in to the jobQueueMap.
*/
private Integer getHashKey(String key) {
int code = key.hashCode();
}
/**
- * JobCallback class is used as a future callback for
- * main and rollback workers to handle success and failure.
+ * JobCallback class is used as a future callback for main and rollback
+ * workers to handle success and failure.
*/
private class JobCallback implements FutureCallback<List<Void>> {
private final JobEntry jobEntry;
}
/**
- * This implies that all the future instances have returned success. -- TODO: Confirm this
+ * This implies that all the future instances have returned
+ * success. -- TODO: Confirm this
*/
@Override
public void onSuccess(List<Void> voids) {
}
/**
- * Handle failure callbacks.
- * If more retry needed, the retrycount is decremented and mainworker is executed again.
- * After retries completed, rollbackworker is executed.
- * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
+ * This method is used to handle failure callbacks. If more
+ * retry needed, the retrycount is decremented and mainworker
+ * is executed again. After retries completed, rollbackworker
+ * is executed. If rollbackworker fails, this is a
+ * double-fault. Double fault is logged and ignored.
*/
@Override
public void onFailure(Throwable throwable) {
}
int retryCount = jobEntry.decrementRetryCountAndGet();
- if ( retryCount > 0) {
- long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
- scheduledExecutorService.schedule(
- () -> {
- MainTask worker = new MainTask(jobEntry);
- fjPool.execute(worker);
- },
- waitTime,
- TimeUnit.MILLISECONDS);
+ if (retryCount > 0) {
+ long waitTime = (RETRY_WAIT_BASE_TIME * 10) / retryCount;
+ scheduledExecutorService.schedule(() -> {
+ MainTask worker = new MainTask(jobEntry);
+ fjPool.execute(worker);
+ }, waitTime, TimeUnit.MILLISECONDS);
return;
}
}
/**
- * Execute the RollbackCallable provided by the application in the eventuality of a failure.
+ * RollbackTask is used to execute the RollbackCallable provided by the
+ * application in the eventuality of a failure.
*/
private class RollbackTask implements Runnable {
private final JobEntry jobEntry;
if (jobEntriesMap.isEmpty()) {
continue;
}
+ LOG.trace("JobQueueHandler handling queue {} with kesy size {}. Keys: {} ", i,
+ jobEntriesMap.size(), Arrays.toString(jobEntriesMap.keySet().toArray()));
- LOG.trace("JobQueueHandler handling queue {} with size {}", i, jobEntriesMap.size());
synchronized (jobEntriesMap) {
Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
while (it.hasNext()) {
MainTask worker = new MainTask(jobEntry);
LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
fjPool.execute(worker);
+ DataStoreJobCoordinatorCounters.jobs_pending.dec();
+
} else {
it.remove();
+ DataStoreJobCoordinatorCounters.jobs_remove_entry.inc();
}
}
}