X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2FConcurrentDOMDataBroker.java;h=cef367fd3ed3921192d4344c335fcf999c77efd3;hb=18991f44b807ab6f06fcec76216b7f70b900b0f4;hp=1738d1d3092ac2fba554f33489a9557a97d7599d;hpb=62468daa9029368b321b6e1e18fa9cfa0ae994b4;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java index 1738d1d309..cef367fd3e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java @@ -18,11 +18,7 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; @@ -79,118 +75,75 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker { @Override protected FluentFuture commit(final DOMDataTreeWriteTransaction transaction, - final Collection cohorts) { + final DOMStoreThreePhaseCommitCohort cohort) { checkArgument(transaction != null, "Transaction must not be null."); - checkArgument(cohorts != null, "Cohorts must not be null."); + checkArgument(cohort != null, "Cohorts must not be null."); LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); - if (cohorts.isEmpty()) { - return CommitInfo.emptyFluentFuture(); - } - final AsyncNotifyingSettableFuture clientSubmitFuture = new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor); - doCanCommit(clientSubmitFuture, transaction, cohorts); - + doCanCommit(clientSubmitFuture, transaction, cohort); return FluentFuture.from(clientSubmitFuture); } private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataTreeWriteTransaction transaction, - final Collection cohorts) { - + final DOMStoreThreePhaseCommitCohort cohort) { final long startTime = System.nanoTime(); - final Iterator cohortIterator = cohorts.iterator(); - - // Not using Futures.allAsList here to avoid its internal overhead. - FutureCallback futureCallback = new FutureCallback<>() { + Futures.addCallback(cohort.canCommit(), new FutureCallback<>() { @Override public void onSuccess(final Boolean result) { if (result == null || !result) { - handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, - new TransactionCommitFailedException("Can Commit failed, no detailed cause available.")); - } else if (!cohortIterator.hasNext()) { - // All cohorts completed successfully - we can move on to the preCommit phase - doPreCommit(startTime, clientSubmitFuture, transaction, cohorts); + onFailure(new TransactionCommitFailedException("Can Commit failed, no detailed cause available.")); } else { - Futures.addCallback(cohortIterator.next().canCommit(), this, MoreExecutors.directExecutor()); + doPreCommit(startTime, clientSubmitFuture, transaction, cohort); } } @Override public void onFailure(final Throwable failure) { - handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure); + handleException(clientSubmitFuture, transaction, cohort, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure); } - }; - - Futures.addCallback(cohortIterator.next().canCommit(), futureCallback, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, - final DOMDataTreeWriteTransaction transaction, - final Collection cohorts) { - - final Iterator cohortIterator = cohorts.iterator(); - - // Not using Futures.allAsList here to avoid its internal overhead. - FutureCallback futureCallback = new FutureCallback<>() { + final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) { + Futures.addCallback(cohort.preCommit(), new FutureCallback<>() { @Override public void onSuccess(final Empty result) { - if (!cohortIterator.hasNext()) { - // All cohorts completed successfully - we can move on to the commit phase - doCommit(startTime, clientSubmitFuture, transaction, cohorts); - } else { - Futures.addCallback(cohortIterator.next().preCommit(), this, MoreExecutors.directExecutor()); - } + doCommit(startTime, clientSubmitFuture, transaction, cohort); } @Override public void onFailure(final Throwable failure) { - handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT, PRE_COMMIT_MAPPER, failure); + handleException(clientSubmitFuture, transaction, cohort, PRE_COMMIT, PRE_COMMIT_MAPPER, failure); } - }; - - Futures.addCallback(cohortIterator.next().preCommit(), futureCallback, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, - final DOMDataTreeWriteTransaction transaction, - final Collection cohorts) { - - final Iterator cohortIterator = cohorts.iterator(); - - // Not using Futures.allAsList here to avoid its internal overhead. - final FutureCallback futureCallback = new FutureCallback<>() { + final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) { + Futures.addCallback(cohort.commit(), new FutureCallback() { @Override public void onSuccess(final CommitInfo result) { - if (!cohortIterator.hasNext()) { - // All cohorts completed successfully - we're done. - commitStatsTracker.addDuration(System.nanoTime() - startTime); - - clientSubmitFuture.set(); - } else { - Futures.addCallback(cohortIterator.next().commit(), this, MoreExecutors.directExecutor()); - } + commitStatsTracker.addDuration(System.nanoTime() - startTime); + clientSubmitFuture.set(); } @Override public void onFailure(final Throwable throwable) { - handleException(clientSubmitFuture, transaction, cohorts, COMMIT, COMMIT_ERROR_MAPPER, throwable); + handleException(clientSubmitFuture, transaction, cohort, COMMIT, COMMIT_ERROR_MAPPER, throwable); } - }; - - Futures.addCallback(cohortIterator.next().commit(), futureCallback, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } private static void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture, - final DOMDataTreeWriteTransaction transaction, - final Collection cohorts, - final String phase, final TransactionCommitFailedExceptionMapper exMapper, - final Throwable throwable) { - + final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort, + final String phase, final TransactionCommitFailedExceptionMapper exMapper, final Throwable throwable) { if (clientSubmitFuture.isDone()) { // We must have had failures from multiple cohorts. return; @@ -199,29 +152,21 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker { // Use debug instead of warn level here because this exception gets propagate back to the caller via the Future LOG.debug("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, throwable); - // Transaction failed - tell all cohorts to abort. - @SuppressWarnings("unchecked") - ListenableFuture[] canCommitFutures = new ListenableFuture[cohorts.size()]; - int index = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - canCommitFutures[index++] = cohort.abort(); - } - // Propagate the original exception final Exception e; if (throwable instanceof NoShardLeaderException || throwable instanceof ShardLeaderNotRespondingException) { e = new DataStoreUnavailableException(throwable.getMessage(), throwable); } else if (throwable instanceof Exception) { - e = (Exception)throwable; + e = (Exception) throwable; } else { e = new RuntimeException("Unexpected error occurred", throwable); } clientSubmitFuture.setException(exMapper.apply(e)); - ListenableFuture> combinedFuture = Futures.allAsList(canCommitFutures); - Futures.addCallback(combinedFuture, new FutureCallback>() { + // abort + Futures.addCallback(cohort.abort(), new FutureCallback() { @Override - public void onSuccess(final List result) { + public void onSuccess(final Empty result) { // Propagate the original exception to the client. LOG.debug("Tx: {} aborted successfully", transaction.getIdentifier()); }