X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMDataCommitCoordinatorImpl.java;h=521e2d0e731af06ac972ce2cce28f75a347ba490;hb=89b2806785f86556e9c32026a1cd38a3a772f1bc;hp=8b9eb445fd45b4d19f5a3f6523a7ce2a711ef310;hpb=c222e37f2a0f0f3f6266242fbea2d3b018f4e6e3;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index 8b9eb445fd..521e2d0e73 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -9,12 +9,14 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,13 +87,24 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); Preconditions.checkArgument(listener != null, "Listener must not be null"); LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); - ListenableFuture commitFuture = executor.submit(new CommitCoordinationTask( - transaction, cohorts, listener)); + + ListenableFuture commitFuture = null; + try { + commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener)); + } catch(RejectedExecutionException e) { + LOG.error("The commit executor's queue is full - submit task was rejected. \n" + + executor, e); + return Futures.immediateFailedCheckedFuture( + new TransactionCommitFailedException( + "Could not submit the commit task - the commit queue capacity has been exceeded.", e)); + } + if (listener.isPresent()) { Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); } - return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(commitFuture, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } /** @@ -285,7 +298,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { */ @SuppressWarnings({ "unchecked", "rawtypes" }) ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); + return MappingCheckedFuture.create(compositeResult, + TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); } /** @@ -316,7 +330,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { */ @SuppressWarnings({ "unchecked", "rawtypes" }) ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(compositeResult, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } /** @@ -342,8 +357,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } ListenableFuture> allCanCommits = Futures.allAsList(canCommitOperations.build()); ListenableFuture allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION); - return Futures - .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(allSuccessFuture, + TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); }