Do not allow multi-datastore transactions
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / CommitCoordinationTask.java
index 49af6d65ff14c0461ff161081249bab1c626994e..dd5280462c79c5f4876628725fc5cf23649ca3d6 100644 (file)
@@ -10,10 +10,7 @@ package org.opendaylight.mdsal.dom.broker;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import org.opendaylight.mdsal.common.api.CommitInfo;
@@ -31,10 +28,9 @@ sealed class CommitCoordinationTask implements Callable<CommitInfo> {
     static final class WithTracker extends CommitCoordinationTask {
         private final DurationStatisticsTracker commitStatTracker;
 
-        WithTracker(final DOMDataTreeWriteTransaction transaction,
-                final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
+        WithTracker(final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort,
                 final DurationStatisticsTracker commitStatTracker) {
-            super(transaction, cohorts);
+            super(transaction, cohort);
             this.commitStatTracker = requireNonNull(commitStatTracker);
         }
 
@@ -57,13 +53,13 @@ sealed class CommitCoordinationTask implements Callable<CommitInfo> {
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
-    private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
+
+    private final DOMStoreThreePhaseCommitCohort cohort;
     private final DOMDataTreeWriteTransaction tx;
 
-    CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction,
-            final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+    CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
         tx = requireNonNull(transaction, "transaction must not be null");
-        this.cohorts = requireNonNull(cohorts, "cohorts must not be null");
+        this.cohort = requireNonNull(cohort, "cohort must not be null");
     }
 
     @Override
@@ -91,148 +87,84 @@ sealed class CommitCoordinationTask implements Callable<CommitInfo> {
     }
 
     /**
-     * Invokes canCommit on underlying cohorts and blocks till all results are returned.
+     * Invokes canCommit on underlying cohort and blocks till the result is returned.
      *
      * <p>
      * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
      * IllegalStateException.
      *
-     * @throws TransactionCommitFailedException If one of cohorts failed can Commit
+     * @throws TransactionCommitFailedException If cohort fails Commit
      */
     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
     private void canCommitBlocking() throws TransactionCommitFailedException {
-        for (final ListenableFuture<?> canCommit : canCommitAll()) {
-            try {
-                final Boolean result = (Boolean)canCommit.get();
-                if (result == null || !result) {
-                    throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
-            }
+        final var future = cohort.canCommit();
+        final Boolean result;
+        try {
+            result = future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
         }
-    }
 
-    /**
-     * Invokes canCommit on underlying cohorts and returns composite future which will contain {@link Boolean#TRUE} only
-     * and only if all cohorts returned true.
-     *
-     * <p>
-     * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
-     * IllegalStateException.
-     *
-     * @return List of all cohorts futures from can commit phase.
-     */
-    private ListenableFuture<?>[] canCommitAll() {
-        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
-        int index = 0;
-        for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-            ops[index++] = cohort.canCommit();
+        if (!Boolean.TRUE.equals(result)) {
+            throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
         }
-        return ops;
     }
 
     /**
-     * Invokes preCommit on underlying cohorts and blocks until all results are returned.
+     * Invokes preCommit on underlying cohort and blocks until the result is returned.
      *
      * <p>
      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
      * IllegalStateException.
      *
-     * @throws TransactionCommitFailedException If one of cohorts failed preCommit
+     * @throws TransactionCommitFailedException If cohort fails preCommit
      */
     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
     private void preCommitBlocking() throws TransactionCommitFailedException {
-        final ListenableFuture<?>[] preCommitFutures = preCommitAll();
         try {
-            for (final ListenableFuture<?> future : preCommitFutures) {
-                future.get();
-            }
+            cohort.preCommit().get();
         } catch (InterruptedException | ExecutionException e) {
             throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
         }
     }
 
     /**
-     * Invokes preCommit on underlying cohorts and returns future which will complete once all preCommit on cohorts
-     * completed or failed.
-     *
-     * <p>
-     * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
-     * IllegalStateException.
-     *
-     * @return List of all cohorts futures from can commit phase.
-     */
-    private ListenableFuture<?>[] preCommitAll() {
-        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
-        int index = 0;
-        for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-            ops[index++] = cohort.preCommit();
-        }
-        return ops;
-    }
-
-    /**
-     * Invokes commit on underlying cohorts and blocks until all results are returned.
+     * Invokes commit on underlying cohort and blocks until result is returned.
      *
      * <p>
      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws IllegalStateException.
      *
-     * @throws TransactionCommitFailedException If one of cohorts failed preCommit
+     * @throws TransactionCommitFailedException If cohort fails preCommit
      */
     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
     private void commitBlocking() throws TransactionCommitFailedException {
-        final ListenableFuture<?>[] commitFutures = commitAll();
         try {
-            for (final ListenableFuture<?> future : commitFutures) {
-                future.get();
-            }
+            cohort.commit().get();
         } catch (InterruptedException | ExecutionException e) {
             throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
         }
     }
 
-    /**
-     * Invokes commit on underlying cohorts and returns future which
-     * completes
-     * once all commits on cohorts are completed.
-     *
-     *<p>
-     * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
-     * IllegalStateException
-     *
-     * @return List of all cohorts futures from can commit phase.
-     */
-    private ListenableFuture<?>[] commitAll() {
-        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
-        int index = 0;
-        for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-            ops[index++] = cohort.commit();
-        }
-        return ops;
-    }
-
     /**
      * Aborts transaction.
      *
      * <p>
-     * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all cohorts, blocks for all results. If any
-     * of the abort failed throws IllegalStateException, which will contains originalCause as suppressed Exception.
+     * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on underlying cohort, blocks the results. If
+     * abort failed throws IllegalStateException, which will contains originalCause as suppressed Exception.
      *
      * <p>
-     * If aborts we're successful throws supplied exception
+     * If abort was successful throws supplied exception
      *
      * @param originalCause Exception which should be used to fail transaction for consumers of transaction future
      *                      and listeners of transaction failure.
-     * @param phase phase in which the problem ensued
-     * @throws TransactionCommitFailedException on invocation of this method.
      * @throws IllegalStateException if abort failed.
+     * @throws TransactionCommitFailedException on invocation of this method.
      */
     private void abortBlocking(final TransactionCommitFailedException originalCause)
             throws TransactionCommitFailedException {
         Exception cause = originalCause;
         try {
-            abortAsyncAll().get();
+            cohort.abort().get();
         } catch (InterruptedException | ExecutionException e) {
             LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
             cause = new IllegalStateException("Abort failed.", e);
@@ -240,26 +172,4 @@ sealed class CommitCoordinationTask implements Callable<CommitInfo> {
         }
         Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
     }
-
-    /**
-     * Invokes abort on underlying cohorts and returns future which completes once all abort on cohorts are completed.
-     *
-     * @return Future which will complete once all cohorts completed abort.
-     */
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private ListenableFuture<Void> abortAsyncAll() {
-
-        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
-        int index = 0;
-        for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-            ops[index++] = cohort.abort();
-        }
-
-        /*
-         * We are returning all futures as list, not only succeeded ones in
-         * order to fail composite future if any of them failed.
-         * See Futures.allAsList for this description.
-         */
-        return (ListenableFuture) Futures.allAsList(ops);
-    }
 }