import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.ArrayDeque;
-import java.util.Queue;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
- static final int SECOND_AS_NANO = 1000000000;
+ static final int SECOND_AS_NANO = 1_000_000_000;
//2^20 as in the model
static final int MAX_ITEM = 1048576;
static final long INIT_TX_TIMEOUT_SECONDS = 125;
- private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
+ private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
+ private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread();
+ private final Collection<ListenableFuture<Void>> futures = new HashSet<>();
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final long runtimeNanos;
private final long delayNanos;
state = State.WAITING;
scheduledFuture.cancel(false);
scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- executor.shutdown();
}
return;
public void onFailure(final Throwable cause) {
txFailure(execFuture, txId, cause);
}
- });
+ }, executor);
}
- final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+ final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
LOG.debug("Future #{} completed successfully", txId);
futures.remove(execFuture);
}
}
- final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+ final void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
LOG.debug("Future #{} failed", txId, cause);
futures.remove(execFuture);
case WAITING:
state = State.FAILED;
scheduledFuture.cancel(false);
- executor.shutdown();
runFailed(cause);
break;
default:
}
}
- private synchronized void checkComplete() {
+ private void checkComplete() {
final int size = futures.size();
if (size == 0) {
return;
LOG.debug("Completed waiting for all futures");
state = State.SUCCESSFUL;
scheduledFuture.cancel(false);
- executor.shutdown();
runSuccessful(txCounter);
return true;
}