Bug 1666: Fixing the clustering config file issue
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
index 8b9eb445fd45b4d19f5a3f6523a7ce2a711ef310..521e2d0e731af06ac972ce2cce28f75a347ba490 100644 (file)
@@ -9,12 +9,14 @@ package org.opendaylight.controller.md.sal.dom.broker.impl;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
 
 import javax.annotation.concurrent.GuardedBy;
 
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,13 +87,24 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
         Preconditions.checkArgument(listener != null, "Listener must not be null");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
-        ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
-                transaction, cohorts, listener));
+
+        ListenableFuture<Void> commitFuture = null;
+        try {
+            commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener));
+        } catch(RejectedExecutionException e) {
+            LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
+                      executor, e);
+            return Futures.immediateFailedCheckedFuture(
+                    new TransactionCommitFailedException(
+                        "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
+        }
+
         if (listener.isPresent()) {
             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
         }
 
-        return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+        return MappingCheckedFuture.create(commitFuture,
+                TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
     }
 
     /**
@@ -285,7 +298,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
              */
             @SuppressWarnings({ "unchecked", "rawtypes" })
             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
-            return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
+            return MappingCheckedFuture.create(compositeResult,
+                                         TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
         }
 
         /**
@@ -316,7 +330,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
              */
             @SuppressWarnings({ "unchecked", "rawtypes" })
             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
-            return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+            return MappingCheckedFuture.create(compositeResult,
+                                     TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
         }
 
         /**
@@ -342,8 +357,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
             }
             ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
             ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
-            return Futures
-                    .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
+            return MappingCheckedFuture.create(allSuccessFuture,
+                                       TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
 
         }