Do not allow multi-datastore transactions 81/103181/3
authorRuslan Kashapov <ruslan.kashapov@pantheon.tech>
Wed, 9 Nov 2022 17:16:58 +0000 (19:16 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 15 Nov 2022 17:07:07 +0000 (18:07 +0100)
The ability to access multiple datastores from the same transaction
has been long-deprecated. This patch disables that ability, binding
each transaction to the datastore it first accesses.

JIRA: CONTROLLER-2055
Change-Id: I57fed3daf2ae9cd0cc6f4899fe1975c05def5c46
Signed-off-by: Ruslan Kashapov <ruslan.kashapov@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBrokerTest.java

index 2655b61f682e526a6205db89b9fe1da10e1821f6..f75707ccbf4082876e4802cc81a5a9abaa720f9c 100644 (file)
@@ -12,22 +12,37 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
-import java.util.Collection;
-import java.util.EnumMap;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.Map;
+import java.util.Map.Entry;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionDatastoreMismatchException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionFactory;
 
 public abstract class AbstractDOMBrokerTransaction<T extends DOMStoreTransaction> implements DOMDataTreeTransaction {
 
-    private final EnumMap<LogicalDatastoreType, T> backingTxs;
-    private final Object identifier;
+    private static final VarHandle BACKING_TX;
+
+    static {
+        try {
+            BACKING_TX = MethodHandles.lookup()
+                .findVarHandle(AbstractDOMBrokerTransaction.class, "backingTx", Entry.class);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    private final @NonNull Object identifier;
     private final Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories;
 
+    private volatile Entry<LogicalDatastoreType, T> backingTx;
+
     /**
-     * Creates new composite Transactions.
+     * Creates new transaction.
      *
      * @param identifier Identifier of transaction.
      */
@@ -35,70 +50,68 @@ public abstract class AbstractDOMBrokerTransaction<T extends DOMStoreTransaction
             Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories) {
         this.identifier = requireNonNull(identifier, "Identifier should not be null");
         this.storeTxFactories = requireNonNull(storeTxFactories, "Store Transaction Factories should not be null");
-        this.backingTxs = new EnumMap<>(LogicalDatastoreType.class);
+        checkArgument(!storeTxFactories.isEmpty(), "Store Transaction Factories should not be empty");
     }
 
     /**
-     * Returns subtransaction associated with supplied key.
+     * Returns sub-transaction associated with supplied key.
      *
-     * @param key the data store type key
-     * @return the subtransaction
-     * @throws NullPointerException
-     *             if key is null
-     * @throws IllegalArgumentException
-     *             if no subtransaction is associated with key.
+     * @param datastoreType the data store type
+     * @return the sub-transaction
+     * @throws NullPointerException                  if datastoreType is null
+     * @throws IllegalArgumentException              if no sub-transaction is associated with datastoreType.
+     * @throws TransactionDatastoreMismatchException if datastoreType mismatches the one used at first access
      */
-    protected final T getSubtransaction(final LogicalDatastoreType key) {
-        requireNonNull(key, "key must not be null.");
+    protected final T getSubtransaction(final LogicalDatastoreType datastoreType) {
+        requireNonNull(datastoreType, "datastoreType must not be null.");
 
-        T ret = backingTxs.get(key);
-        if (ret == null) {
-            ret = createTransaction(key);
-            backingTxs.put(key, ret);
+        var entry = backingTx;
+        if (entry == null) {
+            if (!storeTxFactories.containsKey(datastoreType)) {
+                throw new IllegalArgumentException(datastoreType + " is not supported");
+            }
+            final var tx = createTransaction(datastoreType);
+            final var newEntry = Map.entry(datastoreType, tx);
+            final var witness = (Entry<LogicalDatastoreType, T>) BACKING_TX.compareAndExchange(this, null, newEntry);
+            if (witness != null) {
+                tx.close();
+                entry = witness;
+            } else {
+                entry = newEntry;
+            }
         }
-        checkArgument(ret != null, "No subtransaction associated with %s", key);
-        return ret;
-    }
 
-    protected abstract T createTransaction(LogicalDatastoreType key);
+        final var expected = entry.getKey();
+        if (expected != datastoreType) {
+            throw new TransactionDatastoreMismatchException(expected, datastoreType);
+        }
+        return entry.getValue();
+    }
 
     /**
-     * Returns immutable Iterable of all subtransactions.
-     *
+     * Returns sub-transaction if initialized.
      */
-    protected Collection<T> getSubtransactions() {
-        return backingTxs.values();
+    protected T getSubtransaction() {
+        final Entry<LogicalDatastoreType, T> entry;
+        return (entry = backingTx) == null ? null : entry.getValue();
     }
 
+    protected abstract T createTransaction(LogicalDatastoreType datastoreType);
+
     @Override
     public Object getIdentifier() {
         return identifier;
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    protected void closeSubtransactions() {
-        /*
-         * We share one exception for all failures, which are added
-         * as supressedExceptions to it.
-         */
-        IllegalStateException failure = null;
-        for (T subtransaction : backingTxs.values()) {
+    protected void closeSubtransaction() {
+        if (backingTx != null) {
             try {
-                subtransaction.close();
+                backingTx.getValue().close();
             } catch (Exception e) {
-                // If we did not allocated failure we allocate it
-                if (failure == null) {
-                    failure = new IllegalStateException("Uncaught exception occured during closing transaction", e);
-                } else {
-                    // We update it with additional exceptions, which occurred during error.
-                    failure.addSuppressed(e);
-                }
+                throw new IllegalStateException("Uncaught exception occurred during closing transaction", e);
             }
         }
-        // If we have failure, we throw it at after all attempts to close.
-        if (failure != null) {
-            throw failure;
-        }
     }
 
     protected DOMStoreTransactionFactory getTxFactory(LogicalDatastoreType type) {
index 02e9e047f4db93eb30380ac19bca7ecbfbea4de6..5058fbe75f5a53b4f13d7a15632dc7d93d26889d 100644 (file)
@@ -10,12 +10,11 @@ package org.opendaylight.controller.cluster.databroker;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
+import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFailedFluentFuture;
 
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.Futures;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -23,7 +22,6 @@ import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.broker.TransactionCommitFailedExceptionMapper;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionFactory;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -108,7 +106,7 @@ public abstract class AbstractDOMBrokerWriteTransaction<T extends DOMStoreWriteT
         if (impl != null) {
             LOG.trace("Transaction {} cancelled before submit", getIdentifier());
             FUTURE_UPDATER.lazySet(this, CANCELLED_FUTURE);
-            closeSubtransactions();
+            closeSubtransaction();
             return true;
         }
 
@@ -129,20 +127,18 @@ public abstract class AbstractDOMBrokerWriteTransaction<T extends DOMStoreWriteT
         final AbstractDOMTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
         checkRunning(impl);
 
-        final Collection<T> txns = getSubtransactions();
-        final Collection<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(txns.size());
-
         FluentFuture<? extends CommitInfo> ret;
-        try {
-            for (final T txn : txns) {
-                cohorts.add(txn.ready());
+        final var tx = getSubtransaction();
+        if (tx == null) {
+            ret = CommitInfo.emptyFluentFuture();
+        } else {
+            try {
+                ret = impl.commit(this, tx.ready());
+            } catch (RuntimeException e) {
+                ret = immediateFailedFluentFuture(TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e));
             }
-
-            ret = impl.commit(this, cohorts);
-        } catch (RuntimeException e) {
-            ret = FluentFuture.from(Futures.immediateFailedFuture(
-                    TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e)));
         }
+
         FUTURE_UPDATER.lazySet(this, ret);
         return ret;
     }
index 4d488a1873a9178b0781caf1e141e0ee12a9678a..e41df8dfa2ae62f6494e4aae3d3fdc2088861410 100644 (file)
@@ -12,7 +12,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FluentFuture;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
-import java.util.Collection;
 import java.util.EnumMap;
 import java.util.Map;
 import org.opendaylight.mdsal.common.api.CommitInfo;
@@ -57,11 +56,11 @@ public abstract class AbstractDOMTransactionFactory<T extends DOMStoreTransactio
      * Submits a transaction asynchronously for commit.
      *
      * @param transaction the transaction to submit
-     * @param cohorts the associated cohorts
+     * @param cohort the associated cohort
      * @return a resulting Future
      */
     protected abstract FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction transaction,
-            Collection<DOMStoreThreePhaseCommitCohort> cohorts);
+            DOMStoreThreePhaseCommitCohort cohort);
 
     /**
      * Creates a new read-only transaction.
index 1738d1d3092ac2fba554f33489a9557a97d7599d..cef367fd3ed3921192d4344c335fcf999c77efd3 100644 (file)
@@ -18,11 +18,7 @@ import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
@@ -79,118 +75,75 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
 
     @Override
     protected FluentFuture<? extends CommitInfo> commit(final DOMDataTreeWriteTransaction transaction,
-            final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+            final DOMStoreThreePhaseCommitCohort cohort) {
 
         checkArgument(transaction != null, "Transaction must not be null.");
-        checkArgument(cohorts != null, "Cohorts must not be null.");
+        checkArgument(cohort != null, "Cohorts must not be null.");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
 
-        if (cohorts.isEmpty()) {
-            return CommitInfo.emptyFluentFuture();
-        }
-
         final AsyncNotifyingSettableFuture clientSubmitFuture =
                 new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
 
-        doCanCommit(clientSubmitFuture, transaction, cohorts);
-
+        doCanCommit(clientSubmitFuture, transaction, cohort);
         return FluentFuture.from(clientSubmitFuture);
     }
 
     private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataTreeWriteTransaction transaction,
-            final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
-
+            final DOMStoreThreePhaseCommitCohort cohort) {
         final long startTime = System.nanoTime();
 
-        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
-
-        // Not using Futures.allAsList here to avoid its internal overhead.
-        FutureCallback<Boolean> futureCallback = new FutureCallback<>() {
+        Futures.addCallback(cohort.canCommit(), new FutureCallback<>() {
             @Override
             public void onSuccess(final Boolean result) {
                 if (result == null || !result) {
-                    handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER,
-                            new TransactionCommitFailedException("Can Commit failed, no detailed cause available."));
-                } else if (!cohortIterator.hasNext()) {
-                    // All cohorts completed successfully - we can move on to the preCommit phase
-                    doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+                    onFailure(new TransactionCommitFailedException("Can Commit failed, no detailed cause available."));
                 } else {
-                    Futures.addCallback(cohortIterator.next().canCommit(), this, MoreExecutors.directExecutor());
+                    doPreCommit(startTime, clientSubmitFuture, transaction, cohort);
                 }
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure);
+                handleException(clientSubmitFuture, transaction, cohort, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure);
             }
-        };
-
-        Futures.addCallback(cohortIterator.next().canCommit(), futureCallback, MoreExecutors.directExecutor());
+        }, MoreExecutors.directExecutor());
     }
 
     private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
-            final DOMDataTreeWriteTransaction transaction,
-            final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
-
-        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
-
-        // Not using Futures.allAsList here to avoid its internal overhead.
-        FutureCallback<Empty> futureCallback = new FutureCallback<>() {
+            final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
+        Futures.addCallback(cohort.preCommit(), new FutureCallback<>() {
             @Override
             public void onSuccess(final Empty result) {
-                if (!cohortIterator.hasNext()) {
-                    // All cohorts completed successfully - we can move on to the commit phase
-                    doCommit(startTime, clientSubmitFuture, transaction, cohorts);
-                } else {
-                    Futures.addCallback(cohortIterator.next().preCommit(), this, MoreExecutors.directExecutor());
-                }
+                doCommit(startTime, clientSubmitFuture, transaction, cohort);
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT, PRE_COMMIT_MAPPER, failure);
+                handleException(clientSubmitFuture, transaction, cohort, PRE_COMMIT, PRE_COMMIT_MAPPER, failure);
             }
-        };
-
-        Futures.addCallback(cohortIterator.next().preCommit(), futureCallback, MoreExecutors.directExecutor());
+        }, MoreExecutors.directExecutor());
     }
 
     private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
-            final DOMDataTreeWriteTransaction transaction,
-            final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
-
-        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
-
-        // Not using Futures.allAsList here to avoid its internal overhead.
-        final FutureCallback<CommitInfo> futureCallback = new FutureCallback<>() {
+            final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
+        Futures.addCallback(cohort.commit(), new FutureCallback<CommitInfo>() {
             @Override
             public void onSuccess(final CommitInfo result) {
-                if (!cohortIterator.hasNext()) {
-                    // All cohorts completed successfully - we're done.
-                    commitStatsTracker.addDuration(System.nanoTime() - startTime);
-
-                    clientSubmitFuture.set();
-                } else {
-                    Futures.addCallback(cohortIterator.next().commit(), this, MoreExecutors.directExecutor());
-                }
+                commitStatsTracker.addDuration(System.nanoTime() - startTime);
+                clientSubmitFuture.set();
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
-                handleException(clientSubmitFuture, transaction, cohorts, COMMIT, COMMIT_ERROR_MAPPER, throwable);
+                handleException(clientSubmitFuture, transaction, cohort, COMMIT, COMMIT_ERROR_MAPPER, throwable);
             }
-        };
-
-        Futures.addCallback(cohortIterator.next().commit(), futureCallback, MoreExecutors.directExecutor());
+        }, MoreExecutors.directExecutor());
     }
 
     private static void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
-            final DOMDataTreeWriteTransaction transaction,
-            final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
-            final String phase, final TransactionCommitFailedExceptionMapper exMapper,
-            final Throwable throwable) {
-
+            final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort,
+            final String phase, final TransactionCommitFailedExceptionMapper exMapper, final Throwable throwable) {
         if (clientSubmitFuture.isDone()) {
             // We must have had failures from multiple cohorts.
             return;
@@ -199,29 +152,21 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
         // Use debug instead of warn level here because this exception gets propagate back to the caller via the Future
         LOG.debug("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, throwable);
 
-        // Transaction failed - tell all cohorts to abort.
-        @SuppressWarnings("unchecked")
-        ListenableFuture<Empty>[] canCommitFutures = new ListenableFuture[cohorts.size()];
-        int index = 0;
-        for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-            canCommitFutures[index++] = cohort.abort();
-        }
-
         // Propagate the original exception
         final Exception e;
         if (throwable instanceof NoShardLeaderException || throwable instanceof ShardLeaderNotRespondingException) {
             e = new DataStoreUnavailableException(throwable.getMessage(), throwable);
         } else if (throwable instanceof Exception) {
-            e = (Exception)throwable;
+            e = (Exception) throwable;
         } else {
             e = new RuntimeException("Unexpected error occurred", throwable);
         }
         clientSubmitFuture.setException(exMapper.apply(e));
 
-        ListenableFuture<List<Empty>> combinedFuture = Futures.allAsList(canCommitFutures);
-        Futures.addCallback(combinedFuture, new FutureCallback<List<Empty>>() {
+        // abort
+        Futures.addCallback(cohort.abort(), new FutureCallback<Empty>() {
             @Override
-            public void onSuccess(final List<Empty> result) {
+            public void onSuccess(final Empty result) {
                 // Propagate the original exception to the client.
                 LOG.debug("Tx: {} aborted successfully", transaction.getIdentifier());
             }
index f15fd8679e52b72b4b506488eb33ee379f3695c5..12e6727df8e20975aeb1d8ed5f7eb744352e6e17 100644 (file)
@@ -13,7 +13,6 @@ import static java.util.Objects.requireNonNull;
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.MoreExecutors;
-import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
@@ -81,11 +80,11 @@ final class DOMBrokerTransactionChain extends AbstractDOMTransactionFactory<DOMS
 
     @Override
     public FluentFuture<? extends CommitInfo> commit(
-            final DOMDataTreeWriteTransaction transaction, final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+            final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
         checkNotFailed();
         checkNotClosed();
 
-        final FluentFuture<? extends CommitInfo> ret = broker.commit(transaction, cohorts);
+        final FluentFuture<? extends CommitInfo> ret = broker.commit(transaction, cohort);
 
         COUNTER_UPDATER.incrementAndGet(this);
         ret.addCallback(new FutureCallback<CommitInfo>() {
index e77f6456dedc24a4a46ffb7353b7dc6e2d358b1a..d8b1c3bf3996df5ddce35bd0e4c67f0be387492a 100644 (file)
@@ -6,15 +6,18 @@
  */
 package org.opendaylight.controller.cluster.databroker;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
+import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
 
 import com.google.common.util.concurrent.FluentFuture;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -22,10 +25,15 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.TransactionDatastoreMismatchException;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionFactory;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class AbstractDOMBrokerWriteTransactionTest {
+
+    @Mock
+    private DOMStoreTransactionFactory txFactory;
     @Mock
     private AbstractDOMTransactionFactory<?> abstractDOMTransactionFactory;
     @Mock
@@ -35,36 +43,36 @@ public class AbstractDOMBrokerWriteTransactionTest {
             extends AbstractDOMBrokerWriteTransaction<DOMStoreWriteTransaction> {
 
         AbstractDOMBrokerWriteTransactionTestImpl() {
-            super(new Object(), Collections.emptyMap(), abstractDOMTransactionFactory);
+            this(Map.of(CONFIGURATION, txFactory));
+        }
+
+        AbstractDOMBrokerWriteTransactionTestImpl(Map<LogicalDatastoreType, DOMStoreTransactionFactory> txFactoryMap) {
+            super(new Object(), txFactoryMap, abstractDOMTransactionFactory);
         }
 
         @Override
         protected DOMStoreWriteTransaction createTransaction(final LogicalDatastoreType key) {
-            return null;
+            return domStoreWriteTransaction;
         }
 
         @Override
-        protected Collection<DOMStoreWriteTransaction> getSubtransactions() {
-            return Collections.singletonList(domStoreWriteTransaction);
+        protected DOMStoreWriteTransaction getSubtransaction() {
+            return domStoreWriteTransaction;
         }
     }
 
     @Test
-    public void readyRuntimeExceptionAndCancel() throws InterruptedException {
+    public void readyRuntimeExceptionAndCancel() {
         RuntimeException thrown = new RuntimeException();
         doThrow(thrown).when(domStoreWriteTransaction).ready();
         AbstractDOMBrokerWriteTransactionTestImpl abstractDOMBrokerWriteTransactionTestImpl =
                 new AbstractDOMBrokerWriteTransactionTestImpl();
 
         FluentFuture<? extends CommitInfo> submitFuture = abstractDOMBrokerWriteTransactionTestImpl.commit();
-        try {
-            submitFuture.get();
-            Assert.fail("TransactionCommitFailedException expected");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TransactionCommitFailedException);
-            assertTrue(e.getCause().getCause() == thrown);
-            abstractDOMBrokerWriteTransactionTestImpl.cancel();
-        }
+        final var cause = assertThrows(ExecutionException.class, submitFuture::get).getCause();
+        assertTrue(cause instanceof TransactionCommitFailedException);
+        assertSame(thrown, cause.getCause());
+        abstractDOMBrokerWriteTransactionTestImpl.cancel();
     }
 
     @Test
@@ -75,13 +83,31 @@ public class AbstractDOMBrokerWriteTransactionTest {
                 = new AbstractDOMBrokerWriteTransactionTestImpl();
 
         FluentFuture<? extends CommitInfo> submitFuture = abstractDOMBrokerWriteTransactionTestImpl.commit();
-        try {
-            submitFuture.get();
-            Assert.fail("TransactionCommitFailedException expected");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TransactionCommitFailedException);
-            assertTrue(e.getCause().getCause() == thrown);
-            abstractDOMBrokerWriteTransactionTestImpl.cancel();
-        }
+        final var cause = assertThrows(ExecutionException.class, submitFuture::get).getCause();
+        assertTrue(cause instanceof TransactionCommitFailedException);
+        assertSame(thrown, cause.getCause());
+        abstractDOMBrokerWriteTransactionTestImpl.cancel();
+    }
+
+    @Test
+    public void getSubtransactionStoreMismatch() {
+        final var testTx = new AbstractDOMBrokerWriteTransactionTestImpl(
+                Map.of(CONFIGURATION, txFactory, OPERATIONAL, txFactory));
+
+        assertEquals(domStoreWriteTransaction, testTx.getSubtransaction(CONFIGURATION));
+
+        final var exception = assertThrows(TransactionDatastoreMismatchException.class,
+                () -> testTx.getSubtransaction(OPERATIONAL));
+        assertEquals(CONFIGURATION, exception.expected());
+        assertEquals(OPERATIONAL, exception.encountered());
+    }
+
+    @Test
+    public void getSubtransactionStoreUndefined() {
+        final var testTx = new AbstractDOMBrokerWriteTransactionTestImpl(Map.of(OPERATIONAL, txFactory));
+
+        final var exception = assertThrows(IllegalArgumentException.class,
+                () -> testTx.getSubtransaction(CONFIGURATION));
+        assertEquals("CONFIGURATION is not supported", exception.getMessage());
     }
 }
index 482045181a4b7a120f500cab1d2c605dd03129c4..346578f949df5edc9a6f41eeb56ee24d17bc20da 100644 (file)
@@ -34,8 +34,6 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -50,9 +48,7 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension;
@@ -84,8 +80,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 public class ConcurrentDOMDataBrokerTest {
 
     private final DOMDataTreeWriteTransaction transaction = mock(DOMDataTreeWriteTransaction.class);
-    private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
-    private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class);
+    private final DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
     private final ThreadPoolExecutor futureExecutor =
             new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<>());
     private ConcurrentDOMDataBroker coordinator;
@@ -121,8 +116,7 @@ public class ConcurrentDOMDataBrokerTest {
             final SettableFuture<Boolean> future = SettableFuture.create();
             if (doAsync) {
                 new Thread(() -> {
-                    Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
-                            10, TimeUnit.SECONDS);
+                    Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue, 10, TimeUnit.SECONDS);
                     future.set(Boolean.TRUE);
                 }).start();
             } else {
@@ -132,16 +126,11 @@ public class ConcurrentDOMDataBrokerTest {
             return future;
         };
 
-        doAnswer(asyncCanCommit).when(mockCohort1).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).commit();
+        doAnswer(asyncCanCommit).when(mockCohort).canCommit();
+        doReturn(immediateNullFluentFuture()).when(mockCohort).preCommit();
+        doReturn(immediateNullFluentFuture()).when(mockCohort).commit();
 
-        doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort2).preCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort2).commit();
-
-        ListenableFuture<? extends CommitInfo> future =
-                coordinator.commit(transaction, Arrays.asList(mockCohort1, mockCohort2));
+        ListenableFuture<? extends CommitInfo> future = coordinator.commit(transaction, mockCohort);
 
         final CountDownLatch doneLatch = new CountDownLatch(1);
         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
@@ -169,35 +158,22 @@ public class ConcurrentDOMDataBrokerTest {
 
         assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
 
-        InOrder inOrder = inOrder(mockCohort1, mockCohort2);
-        inOrder.verify(mockCohort1).canCommit();
-        inOrder.verify(mockCohort2).canCommit();
-        inOrder.verify(mockCohort1).preCommit();
-        inOrder.verify(mockCohort2).preCommit();
-        inOrder.verify(mockCohort1).commit();
-        inOrder.verify(mockCohort2).commit();
+        InOrder inOrder = inOrder(mockCohort);
+        inOrder.verify(mockCohort, times(1)).canCommit();
+        inOrder.verify(mockCohort, times(1)).preCommit();
+        inOrder.verify(mockCohort, times(1)).commit();
     }
 
     @Test
     public void testSubmitWithNegativeCanCommitResponse() throws Exception {
-        doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
-
-        doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort2).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
-        DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
-        doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort3).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort3).abort();
-
-        ListenableFuture<? extends CommitInfo> future = coordinator.commit(
-                transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
+        doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort).canCommit();
+        doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
 
-        assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3);
+        assertFailure(coordinator.commit(transaction, mockCohort), null, mockCohort);
     }
 
     private static void assertFailure(final ListenableFuture<?> future, final Exception expCause,
-            final DOMStoreThreePhaseCommitCohort... mockCohorts) throws Exception {
+            final DOMStoreThreePhaseCommitCohort mockCohort) throws Exception {
         try {
             future.get(5, TimeUnit.SECONDS);
             fail("Expected TransactionCommitFailedException");
@@ -206,11 +182,7 @@ public class ConcurrentDOMDataBrokerTest {
             if (expCause != null) {
                 assertSame("Expected cause", expCause.getClass(), tcf.getCause().getClass());
             }
-
-            InOrder inOrder = inOrder((Object[])mockCohorts);
-            for (DOMStoreThreePhaseCommitCohort c: mockCohorts) {
-                inOrder.verify(c).abort();
-            }
+            verify(mockCohort, times(1)).abort();
         } catch (TimeoutException e) {
             throw e;
         }
@@ -218,97 +190,42 @@ public class ConcurrentDOMDataBrokerTest {
 
     @Test
     public void testSubmitWithCanCommitException() throws Exception {
-        doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
-
-        IllegalStateException cause = new IllegalStateException("mock");
-        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
-        FluentFuture<? extends CommitInfo> future = coordinator.commit(
-                transaction, Arrays.asList(mockCohort1, mockCohort2));
+        final Exception cause = new IllegalStateException("mock");
+        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).canCommit();
+        doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
 
-        assertFailure(future, cause, mockCohort1, mockCohort2);
-    }
-
-    @Test
-    public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception {
-        doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
-        NoShardLeaderException rootCause = new NoShardLeaderException("mock");
-        DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause);
-        doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
-        FluentFuture<? extends CommitInfo> future = coordinator.commit(
-            transaction, Arrays.asList(mockCohort1, mockCohort2));
-
-        assertFailure(future, cause, mockCohort1, mockCohort2);
+        assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort);
     }
 
     @Test
     public void testSubmitWithPreCommitException() throws Exception {
-        doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
-
-        doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit();
-        IllegalStateException cause = new IllegalStateException("mock");
-        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
-        DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
-        doReturn(immediateTrueFluentFuture()).when(mockCohort3).canCommit();
-        doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2")))
-                .when(mockCohort3).preCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort3).abort();
-
-        FluentFuture<? extends CommitInfo> future = coordinator.commit(
-                transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
+        doReturn(immediateTrueFluentFuture()).when(mockCohort).canCommit();
+        final IllegalStateException cause = new IllegalStateException("mock");
+        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).preCommit();
+        doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
 
-        assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
+        assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort);
     }
 
     @Test
     public void testSubmitWithCommitException() throws Exception {
-        doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).commit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
-
-        doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort2).preCommit();
-        IllegalStateException cause = new IllegalStateException("mock");
-        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
-        DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
-        doReturn(immediateTrueFluentFuture()).when(mockCohort3).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort3).preCommit();
-        doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2")))
-                .when(mockCohort3).commit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort3).abort();
-
-        FluentFuture<? extends CommitInfo> future = coordinator.commit(
-                transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
-
-        assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
+        doReturn(immediateTrueFluentFuture()).when(mockCohort).canCommit();
+        doReturn(immediateNullFluentFuture()).when(mockCohort).preCommit();
+        final IllegalStateException cause = new IllegalStateException("mock");
+        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).commit();
+        doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
+
+        assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort);
     }
 
     @Test
     public void testSubmitWithAbortException() throws Exception {
-        doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
-        doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error")))
-                .when(mockCohort1).abort();
+        final Exception canCommitCause = new IllegalStateException("canCommit error");
+        doReturn(Futures.immediateFailedFuture(canCommitCause)).when(mockCohort).canCommit();
+        final Exception abortCause = new IllegalStateException("abort error");
+        doReturn(Futures.immediateFailedFuture(abortCause)).when(mockCohort).abort();
 
-        IllegalStateException cause = new IllegalStateException("mock canCommit error");
-        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
-        FluentFuture<? extends CommitInfo> future = coordinator.commit(
-                transaction, Arrays.asList(mockCohort1, mockCohort2));
-
-        assertFailure(future, cause, mockCohort1, mockCohort2);
+        assertFailure(coordinator.commit(transaction, mockCohort), canCommitCause, mockCohort);
     }
 
     @Test
@@ -367,11 +284,6 @@ public class ConcurrentDOMDataBrokerTest {
 
             verify(configDomStore, never()).newReadWriteTransaction();
             verify(operationalDomStore, times(1)).newReadWriteTransaction();
-
-            dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
-
-            verify(configDomStore, times(1)).newReadWriteTransaction();
-            verify(operationalDomStore, times(1)).newReadWriteTransaction();
         }
 
     }
@@ -395,11 +307,6 @@ public class ConcurrentDOMDataBrokerTest {
 
             verify(configDomStore, never()).newWriteOnlyTransaction();
             verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
-
-            dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
-
-            verify(configDomStore, times(1)).newWriteOnlyTransaction();
-            verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
         }
     }
 
@@ -422,11 +329,6 @@ public class ConcurrentDOMDataBrokerTest {
 
             verify(configDomStore, never()).newReadOnlyTransaction();
             verify(operationalDomStore, times(1)).newReadOnlyTransaction();
-
-            dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty());
-
-            verify(configDomStore, times(1)).newReadOnlyTransaction();
-            verify(operationalDomStore, times(1)).newReadOnlyTransaction();
         }
     }
 
@@ -435,7 +337,6 @@ public class ConcurrentDOMDataBrokerTest {
         DOMStore configDomStore = mock(DOMStore.class);
         DOMStore operationalDomStore = mock(DOMStore.class);
         DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class);
-        DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
 
         doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction();
         doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready();
@@ -450,10 +351,10 @@ public class ConcurrentDOMDataBrokerTest {
                 configDomStore), futureExecutor) {
             @Override
             public FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction writeTx,
-                    Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
-                commitCohorts.addAll(cohorts);
+                    DOMStoreThreePhaseCommitCohort cohort) {
+                commitCohorts.add(cohort);
                 latch.countDown();
-                return super.commit(writeTx, cohorts);
+                return super.commit(writeTx, cohort);
             }
         }) {
             DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
@@ -468,56 +369,6 @@ public class ConcurrentDOMDataBrokerTest {
         }
     }
 
-    @Test
-    public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException {
-        DOMStore configDomStore = mock(DOMStore.class);
-        DOMStore operationalDomStore = mock(DOMStore.class);
-        DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
-        DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class);
-        DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class);
-        DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class);
-
-        doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction();
-        doReturn(configTransaction).when(configDomStore).newReadWriteTransaction();
-
-        doReturn(mockCohortOperational).when(operationalTransaction).ready();
-        doReturn(immediateFalseFluentFuture()).when(mockCohortOperational).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohortOperational).abort();
-
-        doReturn(mockCohortConfig).when(configTransaction).ready();
-        doReturn(immediateFalseFluentFuture()).when(mockCohortConfig).canCommit();
-        doReturn(immediateNullFluentFuture()).when(mockCohortConfig).abort();
-
-        final CountDownLatch latch = new CountDownLatch(1);
-        final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList<>();
-
-        try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
-                LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
-                configDomStore), futureExecutor) {
-            @Override
-            @SuppressWarnings("checkstyle:hiddenField")
-            public FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction writeTx,
-                    Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
-                commitCohorts.addAll(cohorts);
-                latch.countDown();
-                return super.commit(writeTx, cohorts);
-            }
-        }) {
-            DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
-
-            domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty(),
-                    mock(NormalizedNode.class));
-            domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(),
-                    mock(NormalizedNode.class));
-
-            domDataReadWriteTransaction.commit();
-
-            assertTrue(latch.await(10, TimeUnit.SECONDS));
-
-            assertTrue(commitCohorts.size() == 2);
-        }
-    }
-
     @Test
     public void testCreateTransactionChain() {
         DOMStore domStore = mock(DOMStore.class);
@@ -590,16 +441,16 @@ public class ConcurrentDOMDataBrokerTest {
             assertNotNull(supportedExtensions.getInstance(DOMDataTreeChangeService.class));
 
             DOMDataTreeCommitCohortRegistry cohortRegistry = supportedExtensions.getInstance(
-                DOMDataTreeCommitCohortRegistry.class);
+                    DOMDataTreeCommitCohortRegistry.class);
             assertNotNull(cohortRegistry);
 
-            DOMDataTreeCommitCohort mockCohort = mock(DOMDataTreeCommitCohort.class);
+            DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class);
             DOMDataTreeIdentifier path = new DOMDataTreeIdentifier(
                     org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION,
                     YangInstanceIdentifier.empty());
-            cohortRegistry.registerCommitCohort(path, mockCohort);
+            cohortRegistry.registerCommitCohort(path, cohort);
 
-            verify(mockConfigStore).registerCommitCohort(path, mockCohort);
+            verify(mockConfigStore).registerCommitCohort(path, cohort);
         }
     }
 }