import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DataStoreJobCoordinator {
+
private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
private static final long RETRY_WAIT_BASE_TIME = 100;
- private ScheduledExecutorService scheduledExecutorService =
- Executors.newScheduledThreadPool(5);
+
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
private final ForkJoinPool fjPool;
- private final Map<Integer,Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
+ private final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition waitCondition = reentrantLock.newCondition();
return instance;
}
- /**
- *
- */
private DataStoreJobCoordinator() {
fjPool = new ForkJoinPool();
for (int i = 0; i < THREADPOOL_SIZE; i++) {
- Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<String, JobQueue>();
+ Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
jobQueueMap.put(i, jobEntriesMap);
}
}
/**
- *
- * @param key
- * @param mainWorker
- * @param rollbackWorker
- * @param maxRetries
- *
- * This is used by the external applications to enqueue a Job with an appropriate key.
+ * Enqueue a Job with an appropriate key.
* A JobEntry is created and queued appropriately.
*/
-
public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
RollbackCallable rollbackWorker, int maxRetries) {
JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
}
/**
- * clearJob is used to cleanup the submitted job from the jobqueue.
+ * Cleanup the submitted job from the job queue.
**/
private void clearJob(JobEntry jobEntry) {
Integer hashKey = getHashKey(jobEntry.getKey());
}
/**
- *
- * @param key
- * @return generated hashkey
- *
- * Used to generate the hashkey in to the jobQueueMap.
+ * Generate the hashkey for the jobQueueMap.
*/
private Integer getHashKey(String key) {
int code = key.hashCode();
private class JobCallback implements FutureCallback<List<Void>> {
private final JobEntry jobEntry;
- public JobCallback(JobEntry jobEntry) {
+ JobCallback(JobEntry jobEntry) {
this.jobEntry = jobEntry;
}
/**
- * @param voids
* This implies that all the future instances have returned success. -- TODO: Confirm this
*/
@Override
}
/**
- *
- * @param throwable
- * This method is used to handle failure callbacks.
+ * Handle failure callbacks.
* If more retry needed, the retrycount is decremented and mainworker is executed again.
* After retries completed, rollbackworker is executed.
* If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
*/
-
@Override
public void onFailure(Throwable throwable) {
LOG.warn("Job: {} failed with exception: {} {}", jobEntry, throwable.getClass().getSimpleName(),
int retryCount = jobEntry.decrementRetryCountAndGet();
if ( retryCount > 0) {
- long waitTime = (RETRY_WAIT_BASE_TIME * 10)/retryCount;
+ long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
scheduledExecutorService.schedule(
- () -> {
- MainTask worker = new MainTask(jobEntry);
- fjPool.execute(worker);
- },
- waitTime,
- TimeUnit.MILLISECONDS);
+ () -> {
+ MainTask worker = new MainTask(jobEntry);
+ fjPool.execute(worker);
+ },
+ waitTime,
+ TimeUnit.MILLISECONDS);
return;
}
}
/**
- * RollbackTask is used to execute the RollbackCallable provided by the application
- * in the eventuality of a failure.
+ * Execute the RollbackCallable provided by the application in the eventuality of a failure.
*/
-
private class RollbackTask implements Runnable {
private final JobEntry jobEntry;
- public RollbackTask(JobEntry jobEntry) {
+ RollbackTask(JobEntry jobEntry) {
this.jobEntry = jobEntry;
}
@Override
+ @SuppressWarnings("checkstyle:illegalcatch")
public void run() {
RollbackCallable callable = jobEntry.getRollbackWorker();
callable.setFutures(jobEntry.getFutures());
try {
futures = callable.call();
- } catch (Exception e){
+ } catch (Exception e) {
LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
}
}
/**
- * MainTask is used to execute the MainWorker callable.
+ * Execute the MainWorker callable.
*/
-
private class MainTask implements Runnable {
private final JobEntry jobEntry;
- public MainTask(JobEntry jobEntry) {
+ MainTask(JobEntry jobEntry) {
this.jobEntry = jobEntry;
}
@Override
+ @SuppressWarnings("checkstyle:illegalcatch")
public void run() {
List<ListenableFuture<Void>> futures = null;
long jobStartTimestamp = System.currentTimeMillis();
futures = jobEntry.getMainWorker().call();
long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
- } catch (Exception e){
+ } catch (Exception e) {
LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
}
private class JobQueueHandler implements Runnable {
@Override
+ @SuppressWarnings("checkstyle:illegalcatch")
public void run() {
LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
while (true) {