From 18991f44b807ab6f06fcec76216b7f70b900b0f4 Mon Sep 17 00:00:00 2001 From: Ruslan Kashapov Date: Wed, 9 Nov 2022 19:16:58 +0200 Subject: [PATCH] Do not allow multi-datastore transactions The ability to access multiple datastores from the same transaction has been long-deprecated. This patch disables that ability, binding each transaction to the datastore it first accesses. JIRA: CONTROLLER-2055 Change-Id: I57fed3daf2ae9cd0cc6f4899fe1975c05def5c46 Signed-off-by: Ruslan Kashapov Signed-off-by: Robert Varga --- .../AbstractDOMBrokerTransaction.java | 105 ++++---- .../AbstractDOMBrokerWriteTransaction.java | 26 +- .../AbstractDOMTransactionFactory.java | 5 +- .../databroker/ConcurrentDOMDataBroker.java | 107 ++------ .../DOMBrokerReadOnlyTransaction.java | 2 +- .../databroker/DOMBrokerTransactionChain.java | 5 +- ...AbstractDOMBrokerWriteTransactionTest.java | 74 ++++-- .../ConcurrentDOMDataBrokerTest.java | 235 ++++-------------- 8 files changed, 194 insertions(+), 365 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java index 2655b61f68..f75707ccbf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java @@ -12,22 +12,37 @@ import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; -import java.util.Collection; -import java.util.EnumMap; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.Map; +import java.util.Map.Entry; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionDatastoreMismatchException; import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionFactory; public abstract class AbstractDOMBrokerTransaction implements DOMDataTreeTransaction { - private final EnumMap backingTxs; - private final Object identifier; + private static final VarHandle BACKING_TX; + + static { + try { + BACKING_TX = MethodHandles.lookup() + .findVarHandle(AbstractDOMBrokerTransaction.class, "backingTx", Entry.class); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new ExceptionInInitializerError(e); + } + } + + private final @NonNull Object identifier; private final Map storeTxFactories; + private volatile Entry backingTx; + /** - * Creates new composite Transactions. + * Creates new transaction. * * @param identifier Identifier of transaction. */ @@ -35,70 +50,68 @@ public abstract class AbstractDOMBrokerTransaction storeTxFactories) { this.identifier = requireNonNull(identifier, "Identifier should not be null"); this.storeTxFactories = requireNonNull(storeTxFactories, "Store Transaction Factories should not be null"); - this.backingTxs = new EnumMap<>(LogicalDatastoreType.class); + checkArgument(!storeTxFactories.isEmpty(), "Store Transaction Factories should not be empty"); } /** - * Returns subtransaction associated with supplied key. + * Returns sub-transaction associated with supplied key. * - * @param key the data store type key - * @return the subtransaction - * @throws NullPointerException - * if key is null - * @throws IllegalArgumentException - * if no subtransaction is associated with key. + * @param datastoreType the data store type + * @return the sub-transaction + * @throws NullPointerException if datastoreType is null + * @throws IllegalArgumentException if no sub-transaction is associated with datastoreType. + * @throws TransactionDatastoreMismatchException if datastoreType mismatches the one used at first access */ - protected final T getSubtransaction(final LogicalDatastoreType key) { - requireNonNull(key, "key must not be null."); + protected final T getSubtransaction(final LogicalDatastoreType datastoreType) { + requireNonNull(datastoreType, "datastoreType must not be null."); - T ret = backingTxs.get(key); - if (ret == null) { - ret = createTransaction(key); - backingTxs.put(key, ret); + var entry = backingTx; + if (entry == null) { + if (!storeTxFactories.containsKey(datastoreType)) { + throw new IllegalArgumentException(datastoreType + " is not supported"); + } + final var tx = createTransaction(datastoreType); + final var newEntry = Map.entry(datastoreType, tx); + final var witness = (Entry) BACKING_TX.compareAndExchange(this, null, newEntry); + if (witness != null) { + tx.close(); + entry = witness; + } else { + entry = newEntry; + } } - checkArgument(ret != null, "No subtransaction associated with %s", key); - return ret; - } - protected abstract T createTransaction(LogicalDatastoreType key); + final var expected = entry.getKey(); + if (expected != datastoreType) { + throw new TransactionDatastoreMismatchException(expected, datastoreType); + } + return entry.getValue(); + } /** - * Returns immutable Iterable of all subtransactions. - * + * Returns sub-transaction if initialized. */ - protected Collection getSubtransactions() { - return backingTxs.values(); + protected T getSubtransaction() { + final Entry entry; + return (entry = backingTx) == null ? null : entry.getValue(); } + protected abstract T createTransaction(LogicalDatastoreType datastoreType); + @Override public Object getIdentifier() { return identifier; } @SuppressWarnings("checkstyle:IllegalCatch") - protected void closeSubtransactions() { - /* - * We share one exception for all failures, which are added - * as supressedExceptions to it. - */ - IllegalStateException failure = null; - for (T subtransaction : backingTxs.values()) { + protected void closeSubtransaction() { + if (backingTx != null) { try { - subtransaction.close(); + backingTx.getValue().close(); } catch (Exception e) { - // If we did not allocated failure we allocate it - if (failure == null) { - failure = new IllegalStateException("Uncaught exception occured during closing transaction", e); - } else { - // We update it with additional exceptions, which occurred during error. - failure.addSuppressed(e); - } + throw new IllegalStateException("Uncaught exception occurred during closing transaction", e); } } - // If we have failure, we throw it at after all attempts to close. - if (failure != null) { - throw failure; - } } protected DOMStoreTransactionFactory getTxFactory(LogicalDatastoreType type) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java index 02e9e047f4..5058fbe75f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java @@ -10,12 +10,11 @@ package org.opendaylight.controller.cluster.databroker; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; +import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFailedFluentFuture; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.Futures; -import java.util.ArrayList; -import java.util.Collection; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -23,7 +22,6 @@ import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.broker.TransactionCommitFailedExceptionMapper; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionFactory; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -108,7 +106,7 @@ public abstract class AbstractDOMBrokerWriteTransaction impl = IMPL_UPDATER.getAndSet(this, null); checkRunning(impl); - final Collection txns = getSubtransactions(); - final Collection cohorts = new ArrayList<>(txns.size()); - FluentFuture ret; - try { - for (final T txn : txns) { - cohorts.add(txn.ready()); + final var tx = getSubtransaction(); + if (tx == null) { + ret = CommitInfo.emptyFluentFuture(); + } else { + try { + ret = impl.commit(this, tx.ready()); + } catch (RuntimeException e) { + ret = immediateFailedFluentFuture(TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e)); } - - ret = impl.commit(this, cohorts); - } catch (RuntimeException e) { - ret = FluentFuture.from(Futures.immediateFailedFuture( - TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e))); } + FUTURE_UPDATER.lazySet(this, ret); return ret; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java index 4d488a1873..e41df8dfa2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java @@ -12,7 +12,6 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FluentFuture; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; -import java.util.Collection; import java.util.EnumMap; import java.util.Map; import org.opendaylight.mdsal.common.api.CommitInfo; @@ -57,11 +56,11 @@ public abstract class AbstractDOMTransactionFactory commit(DOMDataTreeWriteTransaction transaction, - Collection cohorts); + DOMStoreThreePhaseCommitCohort cohort); /** * Creates a new read-only transaction. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java index 1738d1d309..cef367fd3e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java @@ -18,11 +18,7 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; @@ -79,118 +75,75 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker { @Override protected FluentFuture commit(final DOMDataTreeWriteTransaction transaction, - final Collection cohorts) { + final DOMStoreThreePhaseCommitCohort cohort) { checkArgument(transaction != null, "Transaction must not be null."); - checkArgument(cohorts != null, "Cohorts must not be null."); + checkArgument(cohort != null, "Cohorts must not be null."); LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); - if (cohorts.isEmpty()) { - return CommitInfo.emptyFluentFuture(); - } - final AsyncNotifyingSettableFuture clientSubmitFuture = new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor); - doCanCommit(clientSubmitFuture, transaction, cohorts); - + doCanCommit(clientSubmitFuture, transaction, cohort); return FluentFuture.from(clientSubmitFuture); } private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataTreeWriteTransaction transaction, - final Collection cohorts) { - + final DOMStoreThreePhaseCommitCohort cohort) { final long startTime = System.nanoTime(); - final Iterator cohortIterator = cohorts.iterator(); - - // Not using Futures.allAsList here to avoid its internal overhead. - FutureCallback futureCallback = new FutureCallback<>() { + Futures.addCallback(cohort.canCommit(), new FutureCallback<>() { @Override public void onSuccess(final Boolean result) { if (result == null || !result) { - handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, - new TransactionCommitFailedException("Can Commit failed, no detailed cause available.")); - } else if (!cohortIterator.hasNext()) { - // All cohorts completed successfully - we can move on to the preCommit phase - doPreCommit(startTime, clientSubmitFuture, transaction, cohorts); + onFailure(new TransactionCommitFailedException("Can Commit failed, no detailed cause available.")); } else { - Futures.addCallback(cohortIterator.next().canCommit(), this, MoreExecutors.directExecutor()); + doPreCommit(startTime, clientSubmitFuture, transaction, cohort); } } @Override public void onFailure(final Throwable failure) { - handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure); + handleException(clientSubmitFuture, transaction, cohort, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure); } - }; - - Futures.addCallback(cohortIterator.next().canCommit(), futureCallback, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, - final DOMDataTreeWriteTransaction transaction, - final Collection cohorts) { - - final Iterator cohortIterator = cohorts.iterator(); - - // Not using Futures.allAsList here to avoid its internal overhead. - FutureCallback futureCallback = new FutureCallback<>() { + final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) { + Futures.addCallback(cohort.preCommit(), new FutureCallback<>() { @Override public void onSuccess(final Empty result) { - if (!cohortIterator.hasNext()) { - // All cohorts completed successfully - we can move on to the commit phase - doCommit(startTime, clientSubmitFuture, transaction, cohorts); - } else { - Futures.addCallback(cohortIterator.next().preCommit(), this, MoreExecutors.directExecutor()); - } + doCommit(startTime, clientSubmitFuture, transaction, cohort); } @Override public void onFailure(final Throwable failure) { - handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT, PRE_COMMIT_MAPPER, failure); + handleException(clientSubmitFuture, transaction, cohort, PRE_COMMIT, PRE_COMMIT_MAPPER, failure); } - }; - - Futures.addCallback(cohortIterator.next().preCommit(), futureCallback, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, - final DOMDataTreeWriteTransaction transaction, - final Collection cohorts) { - - final Iterator cohortIterator = cohorts.iterator(); - - // Not using Futures.allAsList here to avoid its internal overhead. - final FutureCallback futureCallback = new FutureCallback<>() { + final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) { + Futures.addCallback(cohort.commit(), new FutureCallback() { @Override public void onSuccess(final CommitInfo result) { - if (!cohortIterator.hasNext()) { - // All cohorts completed successfully - we're done. - commitStatsTracker.addDuration(System.nanoTime() - startTime); - - clientSubmitFuture.set(); - } else { - Futures.addCallback(cohortIterator.next().commit(), this, MoreExecutors.directExecutor()); - } + commitStatsTracker.addDuration(System.nanoTime() - startTime); + clientSubmitFuture.set(); } @Override public void onFailure(final Throwable throwable) { - handleException(clientSubmitFuture, transaction, cohorts, COMMIT, COMMIT_ERROR_MAPPER, throwable); + handleException(clientSubmitFuture, transaction, cohort, COMMIT, COMMIT_ERROR_MAPPER, throwable); } - }; - - Futures.addCallback(cohortIterator.next().commit(), futureCallback, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } private static void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture, - final DOMDataTreeWriteTransaction transaction, - final Collection cohorts, - final String phase, final TransactionCommitFailedExceptionMapper exMapper, - final Throwable throwable) { - + final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort, + final String phase, final TransactionCommitFailedExceptionMapper exMapper, final Throwable throwable) { if (clientSubmitFuture.isDone()) { // We must have had failures from multiple cohorts. return; @@ -199,29 +152,21 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker { // Use debug instead of warn level here because this exception gets propagate back to the caller via the Future LOG.debug("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, throwable); - // Transaction failed - tell all cohorts to abort. - @SuppressWarnings("unchecked") - ListenableFuture[] canCommitFutures = new ListenableFuture[cohorts.size()]; - int index = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - canCommitFutures[index++] = cohort.abort(); - } - // Propagate the original exception final Exception e; if (throwable instanceof NoShardLeaderException || throwable instanceof ShardLeaderNotRespondingException) { e = new DataStoreUnavailableException(throwable.getMessage(), throwable); } else if (throwable instanceof Exception) { - e = (Exception)throwable; + e = (Exception) throwable; } else { e = new RuntimeException("Unexpected error occurred", throwable); } clientSubmitFuture.setException(exMapper.apply(e)); - ListenableFuture> combinedFuture = Futures.allAsList(canCommitFutures); - Futures.addCallback(combinedFuture, new FutureCallback>() { + // abort + Futures.addCallback(cohort.abort(), new FutureCallback() { @Override - public void onSuccess(final List result) { + public void onSuccess(final Empty result) { // Propagate the original exception to the client. LOG.debug("Tx: {} aborted successfully", transaction.getIdentifier()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java index c4d5e1d8a5..38c46a4539 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java @@ -44,7 +44,7 @@ public class DOMBrokerReadOnlyTransaction @Override public void close() { - closeSubtransactions(); + closeSubtransaction(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java index f15fd8679e..12e6727df8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java @@ -13,7 +13,6 @@ import static java.util.Objects.requireNonNull; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.MoreExecutors; -import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; @@ -81,11 +80,11 @@ final class DOMBrokerTransactionChain extends AbstractDOMTransactionFactory commit( - final DOMDataTreeWriteTransaction transaction, final Collection cohorts) { + final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) { checkNotFailed(); checkNotClosed(); - final FluentFuture ret = broker.commit(transaction, cohorts); + final FluentFuture ret = broker.commit(transaction, cohort); COUNTER_UPDATER.incrementAndGet(this); ret.addCallback(new FutureCallback() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransactionTest.java index e77f6456de..d8b1c3bf39 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransactionTest.java @@ -6,15 +6,18 @@ */ package org.opendaylight.controller.cluster.databroker; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; +import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION; +import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL; import com.google.common.util.concurrent.FluentFuture; -import java.util.Collection; -import java.util.Collections; +import java.util.Map; import java.util.concurrent.ExecutionException; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -22,10 +25,15 @@ import org.mockito.junit.MockitoJUnitRunner; import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.common.api.TransactionDatastoreMismatchException; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionFactory; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class AbstractDOMBrokerWriteTransactionTest { + + @Mock + private DOMStoreTransactionFactory txFactory; @Mock private AbstractDOMTransactionFactory abstractDOMTransactionFactory; @Mock @@ -35,36 +43,36 @@ public class AbstractDOMBrokerWriteTransactionTest { extends AbstractDOMBrokerWriteTransaction { AbstractDOMBrokerWriteTransactionTestImpl() { - super(new Object(), Collections.emptyMap(), abstractDOMTransactionFactory); + this(Map.of(CONFIGURATION, txFactory)); + } + + AbstractDOMBrokerWriteTransactionTestImpl(Map txFactoryMap) { + super(new Object(), txFactoryMap, abstractDOMTransactionFactory); } @Override protected DOMStoreWriteTransaction createTransaction(final LogicalDatastoreType key) { - return null; + return domStoreWriteTransaction; } @Override - protected Collection getSubtransactions() { - return Collections.singletonList(domStoreWriteTransaction); + protected DOMStoreWriteTransaction getSubtransaction() { + return domStoreWriteTransaction; } } @Test - public void readyRuntimeExceptionAndCancel() throws InterruptedException { + public void readyRuntimeExceptionAndCancel() { RuntimeException thrown = new RuntimeException(); doThrow(thrown).when(domStoreWriteTransaction).ready(); AbstractDOMBrokerWriteTransactionTestImpl abstractDOMBrokerWriteTransactionTestImpl = new AbstractDOMBrokerWriteTransactionTestImpl(); FluentFuture submitFuture = abstractDOMBrokerWriteTransactionTestImpl.commit(); - try { - submitFuture.get(); - Assert.fail("TransactionCommitFailedException expected"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TransactionCommitFailedException); - assertTrue(e.getCause().getCause() == thrown); - abstractDOMBrokerWriteTransactionTestImpl.cancel(); - } + final var cause = assertThrows(ExecutionException.class, submitFuture::get).getCause(); + assertTrue(cause instanceof TransactionCommitFailedException); + assertSame(thrown, cause.getCause()); + abstractDOMBrokerWriteTransactionTestImpl.cancel(); } @Test @@ -75,13 +83,31 @@ public class AbstractDOMBrokerWriteTransactionTest { = new AbstractDOMBrokerWriteTransactionTestImpl(); FluentFuture submitFuture = abstractDOMBrokerWriteTransactionTestImpl.commit(); - try { - submitFuture.get(); - Assert.fail("TransactionCommitFailedException expected"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TransactionCommitFailedException); - assertTrue(e.getCause().getCause() == thrown); - abstractDOMBrokerWriteTransactionTestImpl.cancel(); - } + final var cause = assertThrows(ExecutionException.class, submitFuture::get).getCause(); + assertTrue(cause instanceof TransactionCommitFailedException); + assertSame(thrown, cause.getCause()); + abstractDOMBrokerWriteTransactionTestImpl.cancel(); + } + + @Test + public void getSubtransactionStoreMismatch() { + final var testTx = new AbstractDOMBrokerWriteTransactionTestImpl( + Map.of(CONFIGURATION, txFactory, OPERATIONAL, txFactory)); + + assertEquals(domStoreWriteTransaction, testTx.getSubtransaction(CONFIGURATION)); + + final var exception = assertThrows(TransactionDatastoreMismatchException.class, + () -> testTx.getSubtransaction(OPERATIONAL)); + assertEquals(CONFIGURATION, exception.expected()); + assertEquals(OPERATIONAL, exception.encountered()); + } + + @Test + public void getSubtransactionStoreUndefined() { + final var testTx = new AbstractDOMBrokerWriteTransactionTestImpl(Map.of(OPERATIONAL, txFactory)); + + final var exception = assertThrows(IllegalArgumentException.class, + () -> testTx.getSubtransaction(CONFIGURATION)); + assertEquals("CONFIGURATION is not supported", exception.getMessage()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBrokerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBrokerTest.java index 482045181a..346578f949 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBrokerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBrokerTest.java @@ -34,8 +34,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -50,9 +48,7 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.mdsal.common.api.CommitInfo; -import org.opendaylight.mdsal.common.api.DataStoreUnavailableException; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension; @@ -84,8 +80,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class ConcurrentDOMDataBrokerTest { private final DOMDataTreeWriteTransaction transaction = mock(DOMDataTreeWriteTransaction.class); - private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class); - private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class); + private final DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class); private final ThreadPoolExecutor futureExecutor = new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<>()); private ConcurrentDOMDataBroker coordinator; @@ -121,8 +116,7 @@ public class ConcurrentDOMDataBrokerTest { final SettableFuture future = SettableFuture.create(); if (doAsync) { new Thread(() -> { - Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue, - 10, TimeUnit.SECONDS); + Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue, 10, TimeUnit.SECONDS); future.set(Boolean.TRUE); }).start(); } else { @@ -132,16 +126,11 @@ public class ConcurrentDOMDataBrokerTest { return future; }; - doAnswer(asyncCanCommit).when(mockCohort1).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).commit(); + doAnswer(asyncCanCommit).when(mockCohort).canCommit(); + doReturn(immediateNullFluentFuture()).when(mockCohort).preCommit(); + doReturn(immediateNullFluentFuture()).when(mockCohort).commit(); - doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort2).preCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort2).commit(); - - ListenableFuture future = - coordinator.commit(transaction, Arrays.asList(mockCohort1, mockCohort2)); + ListenableFuture future = coordinator.commit(transaction, mockCohort); final CountDownLatch doneLatch = new CountDownLatch(1); final AtomicReference caughtEx = new AtomicReference<>(); @@ -169,35 +158,22 @@ public class ConcurrentDOMDataBrokerTest { assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount()); - InOrder inOrder = inOrder(mockCohort1, mockCohort2); - inOrder.verify(mockCohort1).canCommit(); - inOrder.verify(mockCohort2).canCommit(); - inOrder.verify(mockCohort1).preCommit(); - inOrder.verify(mockCohort2).preCommit(); - inOrder.verify(mockCohort1).commit(); - inOrder.verify(mockCohort2).commit(); + InOrder inOrder = inOrder(mockCohort); + inOrder.verify(mockCohort, times(1)).canCommit(); + inOrder.verify(mockCohort, times(1)).preCommit(); + inOrder.verify(mockCohort, times(1)).commit(); } @Test public void testSubmitWithNegativeCanCommitResponse() throws Exception { - doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).abort(); - - doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort2).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort2).abort(); - - DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class); - doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort3).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort3).abort(); - - ListenableFuture future = coordinator.commit( - transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3)); + doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort).canCommit(); + doReturn(immediateNullFluentFuture()).when(mockCohort).abort(); - assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3); + assertFailure(coordinator.commit(transaction, mockCohort), null, mockCohort); } private static void assertFailure(final ListenableFuture future, final Exception expCause, - final DOMStoreThreePhaseCommitCohort... mockCohorts) throws Exception { + final DOMStoreThreePhaseCommitCohort mockCohort) throws Exception { try { future.get(5, TimeUnit.SECONDS); fail("Expected TransactionCommitFailedException"); @@ -206,11 +182,7 @@ public class ConcurrentDOMDataBrokerTest { if (expCause != null) { assertSame("Expected cause", expCause.getClass(), tcf.getCause().getClass()); } - - InOrder inOrder = inOrder((Object[])mockCohorts); - for (DOMStoreThreePhaseCommitCohort c: mockCohorts) { - inOrder.verify(c).abort(); - } + verify(mockCohort, times(1)).abort(); } catch (TimeoutException e) { throw e; } @@ -218,97 +190,42 @@ public class ConcurrentDOMDataBrokerTest { @Test public void testSubmitWithCanCommitException() throws Exception { - doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).abort(); - - IllegalStateException cause = new IllegalStateException("mock"); - doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort2).abort(); - - FluentFuture future = coordinator.commit( - transaction, Arrays.asList(mockCohort1, mockCohort2)); + final Exception cause = new IllegalStateException("mock"); + doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).canCommit(); + doReturn(immediateNullFluentFuture()).when(mockCohort).abort(); - assertFailure(future, cause, mockCohort1, mockCohort2); - } - - @Test - public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception { - doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).abort(); - NoShardLeaderException rootCause = new NoShardLeaderException("mock"); - DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause); - doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort2).abort(); - - FluentFuture future = coordinator.commit( - transaction, Arrays.asList(mockCohort1, mockCohort2)); - - assertFailure(future, cause, mockCohort1, mockCohort2); + assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort); } @Test public void testSubmitWithPreCommitException() throws Exception { - doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).abort(); - - doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit(); - IllegalStateException cause = new IllegalStateException("mock"); - doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort2).abort(); - - DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class); - doReturn(immediateTrueFluentFuture()).when(mockCohort3).canCommit(); - doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))) - .when(mockCohort3).preCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort3).abort(); - - FluentFuture future = coordinator.commit( - transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3)); + doReturn(immediateTrueFluentFuture()).when(mockCohort).canCommit(); + final IllegalStateException cause = new IllegalStateException("mock"); + doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).preCommit(); + doReturn(immediateNullFluentFuture()).when(mockCohort).abort(); - assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3); + assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort); } @Test public void testSubmitWithCommitException() throws Exception { - doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).commit(); - doReturn(immediateNullFluentFuture()).when(mockCohort1).abort(); - - doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort2).preCommit(); - IllegalStateException cause = new IllegalStateException("mock"); - doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit(); - doReturn(immediateNullFluentFuture()).when(mockCohort2).abort(); - - DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class); - doReturn(immediateTrueFluentFuture()).when(mockCohort3).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort3).preCommit(); - doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))) - .when(mockCohort3).commit(); - doReturn(immediateNullFluentFuture()).when(mockCohort3).abort(); - - FluentFuture future = coordinator.commit( - transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3)); - - assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3); + doReturn(immediateTrueFluentFuture()).when(mockCohort).canCommit(); + doReturn(immediateNullFluentFuture()).when(mockCohort).preCommit(); + final IllegalStateException cause = new IllegalStateException("mock"); + doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).commit(); + doReturn(immediateNullFluentFuture()).when(mockCohort).abort(); + + assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort); } @Test public void testSubmitWithAbortException() throws Exception { - doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit(); - doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error"))) - .when(mockCohort1).abort(); + final Exception canCommitCause = new IllegalStateException("canCommit error"); + doReturn(Futures.immediateFailedFuture(canCommitCause)).when(mockCohort).canCommit(); + final Exception abortCause = new IllegalStateException("abort error"); + doReturn(Futures.immediateFailedFuture(abortCause)).when(mockCohort).abort(); - IllegalStateException cause = new IllegalStateException("mock canCommit error"); - doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohort2).abort(); - - FluentFuture future = coordinator.commit( - transaction, Arrays.asList(mockCohort1, mockCohort2)); - - assertFailure(future, cause, mockCohort1, mockCohort2); + assertFailure(coordinator.commit(transaction, mockCohort), canCommitCause, mockCohort); } @Test @@ -367,11 +284,6 @@ public class ConcurrentDOMDataBrokerTest { verify(configDomStore, never()).newReadWriteTransaction(); verify(operationalDomStore, times(1)).newReadWriteTransaction(); - - dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(), mock(NormalizedNode.class)); - - verify(configDomStore, times(1)).newReadWriteTransaction(); - verify(operationalDomStore, times(1)).newReadWriteTransaction(); } } @@ -395,11 +307,6 @@ public class ConcurrentDOMDataBrokerTest { verify(configDomStore, never()).newWriteOnlyTransaction(); verify(operationalDomStore, times(1)).newWriteOnlyTransaction(); - - dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(), mock(NormalizedNode.class)); - - verify(configDomStore, times(1)).newWriteOnlyTransaction(); - verify(operationalDomStore, times(1)).newWriteOnlyTransaction(); } } @@ -422,11 +329,6 @@ public class ConcurrentDOMDataBrokerTest { verify(configDomStore, never()).newReadOnlyTransaction(); verify(operationalDomStore, times(1)).newReadOnlyTransaction(); - - dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()); - - verify(configDomStore, times(1)).newReadOnlyTransaction(); - verify(operationalDomStore, times(1)).newReadOnlyTransaction(); } } @@ -435,7 +337,6 @@ public class ConcurrentDOMDataBrokerTest { DOMStore configDomStore = mock(DOMStore.class); DOMStore operationalDomStore = mock(DOMStore.class); DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class); - DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class); doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction(); doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready(); @@ -450,10 +351,10 @@ public class ConcurrentDOMDataBrokerTest { configDomStore), futureExecutor) { @Override public FluentFuture commit(DOMDataTreeWriteTransaction writeTx, - Collection cohorts) { - commitCohorts.addAll(cohorts); + DOMStoreThreePhaseCommitCohort cohort) { + commitCohorts.add(cohort); latch.countDown(); - return super.commit(writeTx, cohorts); + return super.commit(writeTx, cohort); } }) { DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction(); @@ -468,56 +369,6 @@ public class ConcurrentDOMDataBrokerTest { } } - @Test - public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException { - DOMStore configDomStore = mock(DOMStore.class); - DOMStore operationalDomStore = mock(DOMStore.class); - DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class); - DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class); - DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class); - DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class); - - doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction(); - doReturn(configTransaction).when(configDomStore).newReadWriteTransaction(); - - doReturn(mockCohortOperational).when(operationalTransaction).ready(); - doReturn(immediateFalseFluentFuture()).when(mockCohortOperational).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohortOperational).abort(); - - doReturn(mockCohortConfig).when(configTransaction).ready(); - doReturn(immediateFalseFluentFuture()).when(mockCohortConfig).canCommit(); - doReturn(immediateNullFluentFuture()).when(mockCohortConfig).abort(); - - final CountDownLatch latch = new CountDownLatch(1); - final List commitCohorts = new ArrayList<>(); - - try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( - LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION, - configDomStore), futureExecutor) { - @Override - @SuppressWarnings("checkstyle:hiddenField") - public FluentFuture commit(DOMDataTreeWriteTransaction writeTx, - Collection cohorts) { - commitCohorts.addAll(cohorts); - latch.countDown(); - return super.commit(writeTx, cohorts); - } - }) { - DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction(); - - domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty(), - mock(NormalizedNode.class)); - domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(), - mock(NormalizedNode.class)); - - domDataReadWriteTransaction.commit(); - - assertTrue(latch.await(10, TimeUnit.SECONDS)); - - assertTrue(commitCohorts.size() == 2); - } - } - @Test public void testCreateTransactionChain() { DOMStore domStore = mock(DOMStore.class); @@ -590,16 +441,16 @@ public class ConcurrentDOMDataBrokerTest { assertNotNull(supportedExtensions.getInstance(DOMDataTreeChangeService.class)); DOMDataTreeCommitCohortRegistry cohortRegistry = supportedExtensions.getInstance( - DOMDataTreeCommitCohortRegistry.class); + DOMDataTreeCommitCohortRegistry.class); assertNotNull(cohortRegistry); - DOMDataTreeCommitCohort mockCohort = mock(DOMDataTreeCommitCohort.class); + DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class); DOMDataTreeIdentifier path = new DOMDataTreeIdentifier( org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()); - cohortRegistry.registerCommitCohort(path, mockCohort); + cohortRegistry.registerCommitCohort(path, cohort); - verify(mockConfigStore).registerCommitCohort(path, mockCohort); + verify(mockConfigStore).registerCommitCohort(path, cohort); } } } -- 2.36.6