import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.broker.impl.AbstractDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.LoggerFactory;
/**
- * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3
+ * ConcurrentDOMDataBroker commits transactions concurrently. The 3
* commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
* (ie async) per transaction but multiple transaction commits can run concurrent.
*
* @author Thomas Pantelis
*/
-public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
+public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
private static final String CAN_COMMIT = "CAN_COMMIT";
private static final String PRE_COMMIT = "PRE_COMMIT";
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
+ if(cohorts.isEmpty()){
+ return Futures.immediateCheckedFuture(null);
+ }
+
final AsyncNotifyingSettableFuture clientSubmitFuture =
new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
final long startTime = System.nanoTime();
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
new TransactionCommitFailedException(
"Can Commit failed, no detailed cause available."));
} else {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the preCommit phase
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+ } else {
+ ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+ Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
}
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
- Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+ Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the commit phase
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
+ } else {
+ ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+ Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> preCommitFuture = cohort.preCommit();
- Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+ Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we're done.
commitStatsTracker.addDuration(System.nanoTime() - startTime);
clientSubmitFuture.set();
+ } else {
+ ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+ Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> commitFuture = cohort.commit();
- Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+ Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,