import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ListeningExecutorService executor;
+ private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
+
/**
*
* Construct DOMDataCommitCoordinator which uses supplied executor to
this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
}
+ public DurationStatsTracker getCommitStatsTracker() {
+ return commitStatsTracker;
+ }
+
@Override
public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
Preconditions.checkArgument(listener != null, "Listener must not be null");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
- transaction, cohorts, listener));
+
+ ListenableFuture<Void> commitFuture = null;
+ try {
+ commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
+ listener, commitStatsTracker));
+ } catch(RejectedExecutionException e) {
+ LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
+ executor, e);
+ return Futures.immediateFailedCheckedFuture(
+ new TransactionCommitFailedException(
+ "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
+ }
+
if (listener.isPresent()) {
Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
}
- return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(commitFuture,
+ TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
/**
private final DOMDataWriteTransaction tx;
private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
+ private final DurationStatsTracker commitStatTracker;
@GuardedBy("this")
private CommitPhase currentPhase;
public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
- final Optional<DOMDataCommitErrorListener> listener) {
+ final Optional<DOMDataCommitErrorListener> listener,
+ final DurationStatsTracker commitStatTracker) {
this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
this.currentPhase = CommitPhase.SUBMITTED;
+ this.commitStatTracker = commitStatTracker;
}
@Override
public Void call() throws TransactionCommitFailedException {
+ long startTime = System.nanoTime();
try {
canCommitBlocking();
preCommitBlocking();
LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
abortBlocking(e);
throw e;
+ } finally {
+ if(commitStatTracker != null) {
+ commitStatTracker.addDuration(System.nanoTime() - startTime);
+ }
}
}
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
+ return MappingCheckedFuture.create(compositeResult,
+ TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
}
/**
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(compositeResult,
+ TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
/**
}
ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
- return Futures
- .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(allSuccessFuture,
+ TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
}