-
- private static class SubmitCoordinationTask implements Callable<Void> {
-
- private static final Logger LOG = LoggerFactory.getLogger(SubmitCoordinationTask.class);
-
- private final String identifier;
- private final Collection<DOMDataTreeShardWriteTransaction> transactions;
-
- SubmitCoordinationTask(final String identifier,
- final Collection<DOMDataTreeShardWriteTransaction> transactions) {
- this.identifier = identifier;
- this.transactions = transactions;
- }
-
- @Override
- public Void call() throws TransactionCommitFailedException {
-
- try {
- LOG.debug("Producer {}, submit started", identifier);
- submitBlocking();
-
- return null;
- } catch (final TransactionCommitFailedException e) {
- LOG.warn("Failure while submitting transaction for producer {}", identifier, e);
- //FIXME abort here
- throw e;
- }
- }
-
- void submitBlocking() throws TransactionCommitFailedException {
- for (final ListenableFuture<?> commit : submitAll()) {
- try {
- commit.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new TransactionCommitFailedException("Submit failed", e);
- }
- }
- }
-
- private ListenableFuture<?>[] submitAll() {
- final ListenableFuture<?>[] ops = new ListenableFuture<?>[transactions.size()];
- int i = 0;
- for (final DOMDataTreeShardWriteTransaction tx : transactions) {
- ops[i++] = tx.submit();
- }
- return ops;
- }
- }