X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMDataCommitCoordinatorImpl.java;h=3fde8d360f8af6df8cb0bcd705a9e3289d9fd35e;hb=f71a2c712690ecfd1260543ab58d8e16453f7918;hp=8b9eb445fd45b4d19f5a3f6523a7ce2a711ef310;hpb=17d82f582a6bc13c78be3b19954ff8c021180e93;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index 8b9eb445fd..3fde8d360f 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -9,12 +9,15 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.util.DurationStatsTracker; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +70,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { private final ListeningExecutorService executor; + private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker(); + /** * * Construct DOMDataCommitCoordinator which uses supplied executor to @@ -78,6 +83,10 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { this.executor = Preconditions.checkNotNull(executor, "executor must not be null."); } + public DurationStatsTracker getCommitStatsTracker() { + return commitStatsTracker; + } + @Override public CheckedFuture submit(final DOMDataWriteTransaction transaction, final Iterable cohorts, final Optional listener) { @@ -85,13 +94,25 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); Preconditions.checkArgument(listener != null, "Listener must not be null"); LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); - ListenableFuture commitFuture = executor.submit(new CommitCoordinationTask( - transaction, cohorts, listener)); + + ListenableFuture commitFuture = null; + try { + commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, + listener, commitStatsTracker)); + } catch(RejectedExecutionException e) { + LOG.error("The commit executor's queue is full - submit task was rejected. \n" + + executor, e); + return Futures.immediateFailedCheckedFuture( + new TransactionCommitFailedException( + "Could not submit the commit task - the commit queue capacity has been exceeded.", e)); + } + if (listener.isPresent()) { Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); } - return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(commitFuture, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } /** @@ -141,21 +162,25 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { private final DOMDataWriteTransaction tx; private final Iterable cohorts; + private final DurationStatsTracker commitStatTracker; @GuardedBy("this") private CommitPhase currentPhase; public CommitCoordinationTask(final DOMDataWriteTransaction transaction, final Iterable cohorts, - final Optional listener) { + final Optional listener, + final DurationStatsTracker commitStatTracker) { this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); this.currentPhase = CommitPhase.SUBMITTED; + this.commitStatTracker = commitStatTracker; } @Override public Void call() throws TransactionCommitFailedException { + long startTime = System.nanoTime(); try { canCommitBlocking(); preCommitBlocking(); @@ -165,6 +190,10 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e); abortBlocking(e); throw e; + } finally { + if(commitStatTracker != null) { + commitStatTracker.addDuration(System.nanoTime() - startTime); + } } } @@ -285,7 +314,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { */ @SuppressWarnings({ "unchecked", "rawtypes" }) ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); + return MappingCheckedFuture.create(compositeResult, + TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); } /** @@ -316,7 +346,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { */ @SuppressWarnings({ "unchecked", "rawtypes" }) ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(compositeResult, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } /** @@ -342,8 +373,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } ListenableFuture> allCanCommits = Futures.allAsList(canCommitOperations.build()); ListenableFuture allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION); - return Futures - .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(allSuccessFuture, + TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); }