*/
package org.opendaylight.controller.cluster.databroker;
+import static org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER;
+import static org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER;
+import static org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER;
+
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
import org.opendaylight.yangtools.util.DurationStatisticsTracker;
-import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Executor clientFutureCallbackExecutor;
public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores,
- Executor listenableFutureExecutor) {
+ final Executor listenableFutureExecutor) {
this(datastores, listenableFutureExecutor, DurationStatisticsTracker.createConcurrent());
}
public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores,
- Executor listenableFutureExecutor, DurationStatisticsTracker commitStatsTracker) {
+ final Executor listenableFutureExecutor, final DurationStatisticsTracker commitStatsTracker) {
super(datastores);
this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
this.commitStatsTracker = Preconditions.checkNotNull(commitStatsTracker);
}
@Override
- protected CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
- Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ protected CheckedFuture<Void, TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
+ final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
doCanCommit(clientSubmitFuture, transaction, cohorts);
- return MappingCheckedFuture.create(clientSubmitFuture,
- TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(clientSubmitFuture, COMMIT_ERROR_MAPPER);
}
private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture,
// Not using Futures.allAsList here to avoid its internal overhead.
FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
@Override
- public void onSuccess(Boolean result) {
+ public void onSuccess(final Boolean result) {
if (result == null || !result) {
- handleException(clientSubmitFuture, transaction, cohorts,
- CAN_COMMIT, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER,
- new TransactionCommitFailedException(
- "Can Commit failed, no detailed cause available."));
+ handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER,
+ new TransactionCommitFailedException("Can Commit failed, no detailed cause available."));
+ } else if (!cohortIterator.hasNext()) {
+ // All cohorts completed successfully - we can move on to the preCommit phase
+ doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
- if (!cohortIterator.hasNext()) {
- // All cohorts completed successfully - we can move on to the preCommit phase
- doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
- } else {
- ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
- Futures.addCallback(canCommitFuture, this, MoreExecutors.directExecutor());
- }
+ Futures.addCallback(cohortIterator.next().canCommit(), this, MoreExecutors.directExecutor());
}
}
@Override
- public void onFailure(Throwable failure) {
- handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT,
- TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER, failure);
+ public void onFailure(final Throwable failure) {
+ handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure);
}
};
// Not using Futures.allAsList here to avoid its internal overhead.
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
- public void onSuccess(Void notUsed) {
+ public void onSuccess(final Void notUsed) {
if (!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the commit phase
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
}
@Override
- public void onFailure(Throwable failure) {
- handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT,
- TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER, failure);
+ public void onFailure(final Throwable failure) {
+ handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT, PRE_COMMIT_MAPPER, failure);
}
};
// Not using Futures.allAsList here to avoid its internal overhead.
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
- public void onSuccess(Void notUsed) {
+ public void onSuccess(final Void notUsed) {
if (!cohortIterator.hasNext()) {
// All cohorts completed successfully - we're done.
commitStatsTracker.addDuration(System.nanoTime() - startTime);
}
@Override
- public void onFailure(Throwable throwable) {
- handleException(clientSubmitFuture, transaction, cohorts, COMMIT,
- TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER, throwable);
+ public void onFailure(final Throwable throwable) {
+ handleException(clientSubmitFuture, transaction, cohorts, COMMIT, COMMIT_ERROR_MAPPER, throwable);
}
};
ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(canCommitFutures);
Futures.addCallback(combinedFuture, new FutureCallback<List<Void>>() {
@Override
- public void onSuccess(List<Void> notUsed) {
+ public void onSuccess(final List<Void> notUsed) {
// Propagate the original exception to the client.
LOG.debug("Tx: {} aborted successfully", transaction.getIdentifier());
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
LOG.error("Tx: {} Error during Abort.", transaction.getIdentifier(), failure);
}
}, MoreExecutors.directExecutor());
private final Executor listenerExecutor;
- AsyncNotifyingSettableFuture(Executor listenerExecutor) {
+ AsyncNotifyingSettableFuture(final Executor listenerExecutor) {
this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
}
}
@Override
- protected boolean setException(Throwable throwable) {
+ protected boolean setException(final Throwable throwable) {
ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE);
try {
return super.setException(throwable);