- private void maybeFinish() {
- final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
- if (elapsed >= runtimeNanos) {
- LOG.debug("Reached max running time, waiting for futures to complete.");
- scheduledFuture.cancel(false);
-
- final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
-
- try {
- // Timeout from cds should be 2 minutes so leave some leeway.
- allFutures.get(125, TimeUnit.SECONDS);
-
- LOG.debug("All futures completed successfully.");
-
- final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
- .setAllTx(allTx)
- .setInsertTx(insertTx)
- .setDeleteTx(deleteTx)
- .build();
- completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
- .withResult(output).build());
- } catch (ExecutionException e) {
- LOG.error("Write transactions failed.", e.getCause());
- completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
- } catch (InterruptedException | TimeoutException e) {
- LOG.error("Write transactions failed.", e);
- completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION,
- "Final submit was timed out by the test provider or was interrupted", e).build());
-
- for (int i = 0; i < futures.size(); i++) {
- final ListenableFuture<Void> future = futures.get(i);
+ @Override
+ void runFailed(final Throwable cause) {
+ future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+ .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build());
+ }