- final ListenableFuture<Void> future = tx.submit();
- if (LOG.isDebugEnabled()) {
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- LOG.debug("Future #{} completed successfully", offset);
- }
-
- @Override
- public void onFailure(final Throwable cause) {
- LOG.debug("Future #{} failed", offset, cause);
- }
- });
- }
-
- return future;
- }
-
- private void maybeFinish() {
- final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
- if (elapsed >= runtimeNanos) {
- LOG.debug("Reached max running time, waiting for futures to complete.");
- scheduledFuture.cancel(false);
-
- final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
-
- try {
- // Timeout from cds should be 2 minutes so leave some leeway.
- allFutures.get(125, TimeUnit.SECONDS);
-
- LOG.debug("All futures completed successfully.");
-
- final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
- .setAllTx(allTx)
- .setInsertTx(insertTx)
- .setDeleteTx(deleteTx)
- .build();
-
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
- .withResult(output).build());
-
- executor.shutdown();
- } catch (final ExecutionException e) {
- LOG.error("Write transactions failed.", e.getCause());
-
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
- } catch (InterruptedException | TimeoutException e) {
- LOG.error("Write transactions failed.", e);
-
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION,
- "Final submit was timed out by the test provider or was interrupted", e).build());
-
- for (int i = 0; i < futures.size(); i++) {
- final ListenableFuture<Void> future = futures.get(i);
-
- try {
- future.get(0, TimeUnit.NANOSECONDS);
- } catch (final TimeoutException fe) {
- LOG.warn("Future #{}/{} not completed yet", i, futures.size());
- } catch (final ExecutionException fe) {
- LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
- } catch (final InterruptedException fe) {
- LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
- }
- }
- } catch (Exception exception) {
- LOG.error("Write transactions failed.", exception);
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
-
- executor.shutdown();
- }
- }
- }
-
- private interface RandomnessProvider {
- int nextInt(int bound);
- }
-
- private static class NonConflictingProvider implements RandomnessProvider {
-
- private final SplittableRandom random = new SplittableRandom();
- private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
-
- @Override
- public int nextInt(int bound) {
- int nextInt;
- do {
- nextInt = random.nextInt(bound);
- } while (previousNumbers.contains(nextInt));
-
- if (previousNumbers.size() > 100000) {
- previousNumbers.iterator().remove();
- }
- previousNumbers.add(nextInt);
-
- return nextInt;
- }
- }
-
- private static class BasicProvider implements RandomnessProvider {
-
- private final SplittableRandom random = new SplittableRandom();
-
- @Override
- public int nextInt(int bound) {
- return random.nextInt(bound);
- }