X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMDataCommitCoordinatorImpl.java;h=521e2d0e731af06ac972ce2cce28f75a347ba490;hp=540e2fe20ce52208ffd4a659b5e21c70eafe4b10;hb=5bcd1acc0d6407ff27365e3dce7810589eae2a2d;hpb=430114ac078864cd36fbe7d543440dde029a1dc7 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index 540e2fe20c..521e2d0e73 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -6,20 +6,17 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,18 +81,30 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } @Override - public ListenableFuture> submit(final DOMDataWriteTransaction transaction, + public CheckedFuture submit(final DOMDataWriteTransaction transaction, final Iterable cohorts, final Optional listener) { Preconditions.checkArgument(transaction != null, "Transaction must not be null."); Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); Preconditions.checkArgument(listener != null, "Listener must not be null"); LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); - ListenableFuture> commitFuture = executor.submit(new CommitCoordinationTask( - transaction, cohorts, listener)); + + ListenableFuture commitFuture = null; + try { + commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener)); + } catch(RejectedExecutionException e) { + LOG.error("The commit executor's queue is full - submit task was rejected. \n" + + executor, e); + return Futures.immediateFailedCheckedFuture( + new TransactionCommitFailedException( + "Could not submit the commit task - the commit queue capacity has been exceeded.", e)); + } + if (listener.isPresent()) { Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); } - return commitFuture; + + return MappingCheckedFuture.create(commitFuture, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } /** @@ -141,7 +150,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * support of cancelation. * */ - private static class CommitCoordinationTask implements Callable> { + private static class CommitCoordinationTask implements Callable { private final DOMDataWriteTransaction tx; private final Iterable cohorts; @@ -158,12 +167,13 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } @Override - public RpcResult call() throws TransactionCommitFailedException { + public Void call() throws TransactionCommitFailedException { try { canCommitBlocking(); preCommitBlocking(); - return commitBlocking(); + commitBlocking(); + return null; } catch (TransactionCommitFailedException e) { LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e); abortBlocking(e); @@ -219,9 +229,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * If one of cohorts failed preCommit * */ - private RpcResult commitBlocking() throws TransactionCommitFailedException { + private void commitBlocking() throws TransactionCommitFailedException { commitAll().checkedGet(); - return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections. emptySet()); } /** @@ -289,7 +298,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { */ @SuppressWarnings({ "unchecked", "rawtypes" }) ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); + return MappingCheckedFuture.create(compositeResult, + TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); } /** @@ -320,7 +330,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { */ @SuppressWarnings({ "unchecked", "rawtypes" }) ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(compositeResult, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } /** @@ -346,8 +357,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } ListenableFuture> allCanCommits = Futures.allAsList(canCommitOperations.build()); ListenableFuture allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION); - return Futures - .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(allSuccessFuture, + TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); }