- private void maybeFinish(final long current) {
- if ((current - startTime) > timeToTake) {
- LOG.debug("Reached max running time, waiting for futures to complete.");
- scheduledFuture.cancel(false);
-
- final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
-
- try {
- allFutures.get(30, TimeUnit.SECONDS);
-
- LOG.debug("All futures completed successfully.");
-
- final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
- .setAllTx(allTx)
- .setInsertTx(insertTx)
- .setDeleteTx(deleteTx)
- .build();
-
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
- .withResult(output).build());
-
- executor.shutdown();
- } catch (InterruptedException | ExecutionException | TimeoutException exception) {
- LOG.error("Write transactions failed.", exception);
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
-
- executor.shutdown();
- }
- }
- }
-
- private interface RandomnessProvider {
- int nextInt(int bound);
- }
-
- private static class NonConflictingProvider implements RandomnessProvider {
-
- private final SplittableRandom random = new SplittableRandom();
- private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
-
- @Override
- public int nextInt(int bound) {
- int nextInt;
- do {
- nextInt = random.nextInt(bound);
- } while (previousNumbers.contains(nextInt));
-
- if (previousNumbers.size() > 100000) {
- previousNumbers.iterator().remove();
- }
- previousNumbers.add(nextInt);
-
- return nextInt;
- }
- }
-
- private static class BasicProvider implements RandomnessProvider {
-
- private final SplittableRandom random = new SplittableRandom();
-
- @Override
- public int nextInt(int bound) {
- return random.nextInt(bound);
- }
- }
-
- private interface TxProvider {
-
- DOMDataWriteTransaction createTransaction();