import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
* commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
* (ie async) per transaction but multiple transaction commits can run concurrent.
*
* @author Thomas Pantelis
*/
* commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
* (ie async) per transaction but multiple transaction commits can run concurrent.
*
* @author Thomas Pantelis
*/
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
private static final String CAN_COMMIT = "CAN_COMMIT";
private static final String PRE_COMMIT = "PRE_COMMIT";
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
private static final String CAN_COMMIT = "CAN_COMMIT";
private static final String PRE_COMMIT = "PRE_COMMIT";
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
final AsyncNotifyingSettableFuture clientSubmitFuture =
new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
final AsyncNotifyingSettableFuture clientSubmitFuture =
new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
new TransactionCommitFailedException(
"Can Commit failed, no detailed cause available."));
} else {
new TransactionCommitFailedException(
"Can Commit failed, no detailed cause available."));
} else {
// All cohorts completed successfully - we can move on to the preCommit phase
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
// All cohorts completed successfully - we can move on to the preCommit phase
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
- Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+ Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
// All cohorts completed successfully - we can move on to the commit phase
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
// All cohorts completed successfully - we can move on to the commit phase
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> preCommitFuture = cohort.preCommit();
- Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+ Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
// All cohorts completed successfully - we're done.
commitStatsTracker.addDuration(System.nanoTime() - startTime);
clientSubmitFuture.set();
// All cohorts completed successfully - we're done.
commitStatsTracker.addDuration(System.nanoTime() - startTime);
clientSubmitFuture.set();
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> commitFuture = cohort.commit();
- Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+ Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);