Merge "Bug 1666: Fixing the clustering config file issue"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
index 9a6d12fb1872fc0a96556b06133a48894e14a532..3fde8d360f8af6df8cb0bcd705a9e3289d9fd35e 100644 (file)
@@ -9,12 +9,14 @@ package org.opendaylight.controller.md.sal.dom.broker.impl;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
 
 import javax.annotation.concurrent.GuardedBy;
 
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,6 +70,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
     private final ListeningExecutorService executor;
 
+    private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
+
     /**
      *
      * Construct DOMDataCommitCoordinator which uses supplied executor to
@@ -79,6 +83,10 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
     }
 
+    public DurationStatsTracker getCommitStatsTracker() {
+        return commitStatsTracker;
+    }
+
     @Override
     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
@@ -86,8 +94,19 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
         Preconditions.checkArgument(listener != null, "Listener must not be null");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
-        ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
-                transaction, cohorts, listener));
+
+        ListenableFuture<Void> commitFuture = null;
+        try {
+            commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
+                    listener, commitStatsTracker));
+        } catch(RejectedExecutionException e) {
+            LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
+                      executor, e);
+            return Futures.immediateFailedCheckedFuture(
+                    new TransactionCommitFailedException(
+                        "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
+        }
+
         if (listener.isPresent()) {
             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
         }
@@ -143,21 +162,25 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
         private final DOMDataWriteTransaction tx;
         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
+        private final DurationStatsTracker commitStatTracker;
 
         @GuardedBy("this")
         private CommitPhase currentPhase;
 
         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
-                final Optional<DOMDataCommitErrorListener> listener) {
+                final Optional<DOMDataCommitErrorListener> listener,
+                final DurationStatsTracker commitStatTracker) {
             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
             this.currentPhase = CommitPhase.SUBMITTED;
+            this.commitStatTracker = commitStatTracker;
         }
 
         @Override
         public Void call() throws TransactionCommitFailedException {
 
+            long startTime = System.nanoTime();
             try {
                 canCommitBlocking();
                 preCommitBlocking();
@@ -167,6 +190,10 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
                 abortBlocking(e);
                 throw e;
+            } finally {
+                if(commitStatTracker != null) {
+                    commitStatTracker.addDuration(System.nanoTime() - startTime);
+                }
             }
         }