* This thread only removes from futures set.
*/
private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
- private final Collection<ListenableFuture<Void>> futures = Collections.synchronizedSet(new HashSet<>());
+ private final Collection<ListenableFuture<?>> futures = Collections.synchronizedSet(new HashSet<>());
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final long runtimeNanos;
private final long delayNanos;
// Not completed yet: create a transaction and hook it up
final long txId = txCounter++;
- final ListenableFuture<Void> execFuture = execWrite(txId);
+ final ListenableFuture<?> execFuture = execWrite(txId);
LOG.debug("New future #{} allocated", txId);
// Ordering is important: we need to add the future before hooking the callback
futures.add(execFuture);
- Futures.addCallback(execFuture, new FutureCallback<Void>() {
+ Futures.addCallback(execFuture, new FutureCallback<Object>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final Object result) {
txSuccess(execFuture, txId);
}
return false;
}
- final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+ final void txSuccess(final ListenableFuture<?> execFuture, final long txId) {
LOG.debug("Future #{} completed successfully", txId);
futures.remove(execFuture);
}
}
- final void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+ final void txFailure(final ListenableFuture<?> execFuture, final long txId, final Throwable cause) {
LOG.debug("Future #{} failed", txId, cause);
futures.remove(execFuture);
synchronized (futures) {
int offset = 0;
- for (ListenableFuture<Void> future : futures) {
+ for (ListenableFuture<?> future : futures) {
try {
future.get(0, TimeUnit.NANOSECONDS);
} catch (final TimeoutException e) {
runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
}
- abstract ListenableFuture<Void> execWrite(long txId);
+ abstract ListenableFuture<?> execWrite(long txId);
abstract void runFailed(Throwable cause);