*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
+ public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
Preconditions.checkArgument(listener != null, "Listener must not be null");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
- transaction, cohorts, listener));
+
+ ListenableFuture<Void> commitFuture = null;
+ try {
+ commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener));
+ } catch(RejectedExecutionException e) {
+ LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
+ executor, e);
+ return Futures.immediateFailedCheckedFuture(
+ new TransactionCommitFailedException(
+ "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
+ }
+
if (listener.isPresent()) {
Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
}
- return commitFuture;
+
+ return MappingCheckedFuture.create(commitFuture,
+ TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
/**
* support of cancelation.
*
*/
- private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
+ private static class CommitCoordinationTask implements Callable<Void> {
private final DOMDataWriteTransaction tx;
private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
}
@Override
- public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
+ public Void call() throws TransactionCommitFailedException {
try {
canCommitBlocking();
preCommitBlocking();
- return commitBlocking();
+ commitBlocking();
+ return null;
} catch (TransactionCommitFailedException e) {
LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
abortBlocking(e);
* If one of cohorts failed preCommit
*
*/
- private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
+ private void commitBlocking() throws TransactionCommitFailedException {
commitAll().checkedGet();
- return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.<RpcError> emptySet());
}
/**
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
+ return MappingCheckedFuture.create(compositeResult,
+ TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
}
/**
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(compositeResult,
+ TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
/**
}
ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
- return Futures
- .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(allSuccessFuture,
+ TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
}