import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
-
+ private static final long RETRY_WAIT_BASE_TIME = 100;
+ private ScheduledExecutorService scheduledExecutorService =
+ Executors.newScheduledThreadPool(5);
private final ForkJoinPool fjPool;
private final Map<Integer,Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
return;
}
- if (jobEntry.decrementRetryCountAndGet() > 0) {
- MainTask worker = new MainTask(jobEntry);
- fjPool.execute(worker);
+ int retryCount = jobEntry.decrementRetryCountAndGet();
+ if ( retryCount > 0) {
+ long waitTime = (RETRY_WAIT_BASE_TIME * 10)/retryCount;
+ scheduledExecutorService.schedule(
+ () -> {
+ MainTask worker = new MainTask(jobEntry);
+ fjPool.execute(worker);
+ },
+ waitTime,
+ TimeUnit.MILLISECONDS);
return;
}