import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
-import java.util.Collection;
-import java.util.EnumMap;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.util.Map;
+import java.util.Map.Entry;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionDatastoreMismatchException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionFactory;
public abstract class AbstractDOMBrokerTransaction<T extends DOMStoreTransaction> implements DOMDataTreeTransaction {
- private final EnumMap<LogicalDatastoreType, T> backingTxs;
- private final Object identifier;
+ private static final VarHandle BACKING_TX;
+
+ static {
+ try {
+ BACKING_TX = MethodHandles.lookup()
+ .findVarHandle(AbstractDOMBrokerTransaction.class, "backingTx", Entry.class);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ private final @NonNull Object identifier;
private final Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories;
+ private volatile Entry<LogicalDatastoreType, T> backingTx;
+
/**
- * Creates new composite Transactions.
+ * Creates new transaction.
*
* @param identifier Identifier of transaction.
*/
Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories) {
this.identifier = requireNonNull(identifier, "Identifier should not be null");
this.storeTxFactories = requireNonNull(storeTxFactories, "Store Transaction Factories should not be null");
- this.backingTxs = new EnumMap<>(LogicalDatastoreType.class);
+ checkArgument(!storeTxFactories.isEmpty(), "Store Transaction Factories should not be empty");
}
/**
- * Returns subtransaction associated with supplied key.
+ * Returns sub-transaction associated with supplied key.
*
- * @param key the data store type key
- * @return the subtransaction
- * @throws NullPointerException
- * if key is null
- * @throws IllegalArgumentException
- * if no subtransaction is associated with key.
+ * @param datastoreType the data store type
+ * @return the sub-transaction
+ * @throws NullPointerException if datastoreType is null
+ * @throws IllegalArgumentException if no sub-transaction is associated with datastoreType.
+ * @throws TransactionDatastoreMismatchException if datastoreType mismatches the one used at first access
*/
- protected final T getSubtransaction(final LogicalDatastoreType key) {
- requireNonNull(key, "key must not be null.");
+ protected final T getSubtransaction(final LogicalDatastoreType datastoreType) {
+ requireNonNull(datastoreType, "datastoreType must not be null.");
- T ret = backingTxs.get(key);
- if (ret == null) {
- ret = createTransaction(key);
- backingTxs.put(key, ret);
+ var entry = backingTx;
+ if (entry == null) {
+ if (!storeTxFactories.containsKey(datastoreType)) {
+ throw new IllegalArgumentException(datastoreType + " is not supported");
+ }
+ final var tx = createTransaction(datastoreType);
+ final var newEntry = Map.entry(datastoreType, tx);
+ final var witness = (Entry<LogicalDatastoreType, T>) BACKING_TX.compareAndExchange(this, null, newEntry);
+ if (witness != null) {
+ tx.close();
+ entry = witness;
+ } else {
+ entry = newEntry;
+ }
}
- checkArgument(ret != null, "No subtransaction associated with %s", key);
- return ret;
- }
- protected abstract T createTransaction(LogicalDatastoreType key);
+ final var expected = entry.getKey();
+ if (expected != datastoreType) {
+ throw new TransactionDatastoreMismatchException(expected, datastoreType);
+ }
+ return entry.getValue();
+ }
/**
- * Returns immutable Iterable of all subtransactions.
- *
+ * Returns sub-transaction if initialized.
*/
- protected Collection<T> getSubtransactions() {
- return backingTxs.values();
+ protected T getSubtransaction() {
+ final Entry<LogicalDatastoreType, T> entry;
+ return (entry = backingTx) == null ? null : entry.getValue();
}
+ protected abstract T createTransaction(LogicalDatastoreType datastoreType);
+
@Override
public Object getIdentifier() {
return identifier;
}
@SuppressWarnings("checkstyle:IllegalCatch")
- protected void closeSubtransactions() {
- /*
- * We share one exception for all failures, which are added
- * as supressedExceptions to it.
- */
- IllegalStateException failure = null;
- for (T subtransaction : backingTxs.values()) {
+ protected void closeSubtransaction() {
+ if (backingTx != null) {
try {
- subtransaction.close();
+ backingTx.getValue().close();
} catch (Exception e) {
- // If we did not allocated failure we allocate it
- if (failure == null) {
- failure = new IllegalStateException("Uncaught exception occured during closing transaction", e);
- } else {
- // We update it with additional exceptions, which occurred during error.
- failure.addSuppressed(e);
- }
+ throw new IllegalStateException("Uncaught exception occurred during closing transaction", e);
}
}
- // If we have failure, we throw it at after all attempts to close.
- if (failure != null) {
- throw failure;
- }
}
protected DOMStoreTransactionFactory getTxFactory(LogicalDatastoreType type) {
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
+import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFailedFluentFuture;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.broker.TransactionCommitFailedExceptionMapper;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionFactory;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
if (impl != null) {
LOG.trace("Transaction {} cancelled before submit", getIdentifier());
FUTURE_UPDATER.lazySet(this, CANCELLED_FUTURE);
- closeSubtransactions();
+ closeSubtransaction();
return true;
}
final AbstractDOMTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
checkRunning(impl);
- final Collection<T> txns = getSubtransactions();
- final Collection<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(txns.size());
-
FluentFuture<? extends CommitInfo> ret;
- try {
- for (final T txn : txns) {
- cohorts.add(txn.ready());
+ final var tx = getSubtransaction();
+ if (tx == null) {
+ ret = CommitInfo.emptyFluentFuture();
+ } else {
+ try {
+ ret = impl.commit(this, tx.ready());
+ } catch (RuntimeException e) {
+ ret = immediateFailedFluentFuture(TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e));
}
-
- ret = impl.commit(this, cohorts);
- } catch (RuntimeException e) {
- ret = FluentFuture.from(Futures.immediateFailedFuture(
- TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e)));
}
+
FUTURE_UPDATER.lazySet(this, ret);
return ret;
}
import com.google.common.util.concurrent.FluentFuture;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
-import java.util.Collection;
import java.util.EnumMap;
import java.util.Map;
import org.opendaylight.mdsal.common.api.CommitInfo;
* Submits a transaction asynchronously for commit.
*
* @param transaction the transaction to submit
- * @param cohorts the associated cohorts
+ * @param cohort the associated cohort
* @return a resulting Future
*/
protected abstract FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction transaction,
- Collection<DOMStoreThreePhaseCommitCohort> cohorts);
+ DOMStoreThreePhaseCommitCohort cohort);
/**
* Creates a new read-only transaction.
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
@Override
protected FluentFuture<? extends CommitInfo> commit(final DOMDataTreeWriteTransaction transaction,
- final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final DOMStoreThreePhaseCommitCohort cohort) {
checkArgument(transaction != null, "Transaction must not be null.");
- checkArgument(cohorts != null, "Cohorts must not be null.");
+ checkArgument(cohort != null, "Cohorts must not be null.");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- if (cohorts.isEmpty()) {
- return CommitInfo.emptyFluentFuture();
- }
-
final AsyncNotifyingSettableFuture clientSubmitFuture =
new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
- doCanCommit(clientSubmitFuture, transaction, cohorts);
-
+ doCanCommit(clientSubmitFuture, transaction, cohort);
return FluentFuture.from(clientSubmitFuture);
}
private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataTreeWriteTransaction transaction,
- final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
-
+ final DOMStoreThreePhaseCommitCohort cohort) {
final long startTime = System.nanoTime();
- final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
-
- // Not using Futures.allAsList here to avoid its internal overhead.
- FutureCallback<Boolean> futureCallback = new FutureCallback<>() {
+ Futures.addCallback(cohort.canCommit(), new FutureCallback<>() {
@Override
public void onSuccess(final Boolean result) {
if (result == null || !result) {
- handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER,
- new TransactionCommitFailedException("Can Commit failed, no detailed cause available."));
- } else if (!cohortIterator.hasNext()) {
- // All cohorts completed successfully - we can move on to the preCommit phase
- doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+ onFailure(new TransactionCommitFailedException("Can Commit failed, no detailed cause available."));
} else {
- Futures.addCallback(cohortIterator.next().canCommit(), this, MoreExecutors.directExecutor());
+ doPreCommit(startTime, clientSubmitFuture, transaction, cohort);
}
}
@Override
public void onFailure(final Throwable failure) {
- handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure);
+ handleException(clientSubmitFuture, transaction, cohort, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure);
}
- };
-
- Futures.addCallback(cohortIterator.next().canCommit(), futureCallback, MoreExecutors.directExecutor());
+ }, MoreExecutors.directExecutor());
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
- final DOMDataTreeWriteTransaction transaction,
- final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
-
- final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
-
- // Not using Futures.allAsList here to avoid its internal overhead.
- FutureCallback<Empty> futureCallback = new FutureCallback<>() {
+ final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
+ Futures.addCallback(cohort.preCommit(), new FutureCallback<>() {
@Override
public void onSuccess(final Empty result) {
- if (!cohortIterator.hasNext()) {
- // All cohorts completed successfully - we can move on to the commit phase
- doCommit(startTime, clientSubmitFuture, transaction, cohorts);
- } else {
- Futures.addCallback(cohortIterator.next().preCommit(), this, MoreExecutors.directExecutor());
- }
+ doCommit(startTime, clientSubmitFuture, transaction, cohort);
}
@Override
public void onFailure(final Throwable failure) {
- handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT, PRE_COMMIT_MAPPER, failure);
+ handleException(clientSubmitFuture, transaction, cohort, PRE_COMMIT, PRE_COMMIT_MAPPER, failure);
}
- };
-
- Futures.addCallback(cohortIterator.next().preCommit(), futureCallback, MoreExecutors.directExecutor());
+ }, MoreExecutors.directExecutor());
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
- final DOMDataTreeWriteTransaction transaction,
- final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
-
- final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
-
- // Not using Futures.allAsList here to avoid its internal overhead.
- final FutureCallback<CommitInfo> futureCallback = new FutureCallback<>() {
+ final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
+ Futures.addCallback(cohort.commit(), new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
- if (!cohortIterator.hasNext()) {
- // All cohorts completed successfully - we're done.
- commitStatsTracker.addDuration(System.nanoTime() - startTime);
-
- clientSubmitFuture.set();
- } else {
- Futures.addCallback(cohortIterator.next().commit(), this, MoreExecutors.directExecutor());
- }
+ commitStatsTracker.addDuration(System.nanoTime() - startTime);
+ clientSubmitFuture.set();
}
@Override
public void onFailure(final Throwable throwable) {
- handleException(clientSubmitFuture, transaction, cohorts, COMMIT, COMMIT_ERROR_MAPPER, throwable);
+ handleException(clientSubmitFuture, transaction, cohort, COMMIT, COMMIT_ERROR_MAPPER, throwable);
}
- };
-
- Futures.addCallback(cohortIterator.next().commit(), futureCallback, MoreExecutors.directExecutor());
+ }, MoreExecutors.directExecutor());
}
private static void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
- final DOMDataTreeWriteTransaction transaction,
- final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
- final String phase, final TransactionCommitFailedExceptionMapper exMapper,
- final Throwable throwable) {
-
+ final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort,
+ final String phase, final TransactionCommitFailedExceptionMapper exMapper, final Throwable throwable) {
if (clientSubmitFuture.isDone()) {
// We must have had failures from multiple cohorts.
return;
// Use debug instead of warn level here because this exception gets propagate back to the caller via the Future
LOG.debug("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, throwable);
- // Transaction failed - tell all cohorts to abort.
- @SuppressWarnings("unchecked")
- ListenableFuture<Empty>[] canCommitFutures = new ListenableFuture[cohorts.size()];
- int index = 0;
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- canCommitFutures[index++] = cohort.abort();
- }
-
// Propagate the original exception
final Exception e;
if (throwable instanceof NoShardLeaderException || throwable instanceof ShardLeaderNotRespondingException) {
e = new DataStoreUnavailableException(throwable.getMessage(), throwable);
} else if (throwable instanceof Exception) {
- e = (Exception)throwable;
+ e = (Exception) throwable;
} else {
e = new RuntimeException("Unexpected error occurred", throwable);
}
clientSubmitFuture.setException(exMapper.apply(e));
- ListenableFuture<List<Empty>> combinedFuture = Futures.allAsList(canCommitFutures);
- Futures.addCallback(combinedFuture, new FutureCallback<List<Empty>>() {
+ // abort
+ Futures.addCallback(cohort.abort(), new FutureCallback<Empty>() {
@Override
- public void onSuccess(final List<Empty> result) {
+ public void onSuccess(final Empty result) {
// Propagate the original exception to the client.
LOG.debug("Tx: {} aborted successfully", transaction.getIdentifier());
}
@Override
public void close() {
- closeSubtransactions();
+ closeSubtransaction();
}
@Override
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
-import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
@Override
public FluentFuture<? extends CommitInfo> commit(
- final DOMDataTreeWriteTransaction transaction, final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
checkNotFailed();
checkNotClosed();
- final FluentFuture<? extends CommitInfo> ret = broker.commit(transaction, cohorts);
+ final FluentFuture<? extends CommitInfo> ret = broker.commit(transaction, cohort);
COUNTER_UPDATER.incrementAndGet(this);
ret.addCallback(new FutureCallback<CommitInfo>() {
*/
package org.opendaylight.controller.cluster.databroker;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
+import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
import com.google.common.util.concurrent.FluentFuture;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.TransactionDatastoreMismatchException;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionFactory;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class AbstractDOMBrokerWriteTransactionTest {
+
+ @Mock
+ private DOMStoreTransactionFactory txFactory;
@Mock
private AbstractDOMTransactionFactory<?> abstractDOMTransactionFactory;
@Mock
extends AbstractDOMBrokerWriteTransaction<DOMStoreWriteTransaction> {
AbstractDOMBrokerWriteTransactionTestImpl() {
- super(new Object(), Collections.emptyMap(), abstractDOMTransactionFactory);
+ this(Map.of(CONFIGURATION, txFactory));
+ }
+
+ AbstractDOMBrokerWriteTransactionTestImpl(Map<LogicalDatastoreType, DOMStoreTransactionFactory> txFactoryMap) {
+ super(new Object(), txFactoryMap, abstractDOMTransactionFactory);
}
@Override
protected DOMStoreWriteTransaction createTransaction(final LogicalDatastoreType key) {
- return null;
+ return domStoreWriteTransaction;
}
@Override
- protected Collection<DOMStoreWriteTransaction> getSubtransactions() {
- return Collections.singletonList(domStoreWriteTransaction);
+ protected DOMStoreWriteTransaction getSubtransaction() {
+ return domStoreWriteTransaction;
}
}
@Test
- public void readyRuntimeExceptionAndCancel() throws InterruptedException {
+ public void readyRuntimeExceptionAndCancel() {
RuntimeException thrown = new RuntimeException();
doThrow(thrown).when(domStoreWriteTransaction).ready();
AbstractDOMBrokerWriteTransactionTestImpl abstractDOMBrokerWriteTransactionTestImpl =
new AbstractDOMBrokerWriteTransactionTestImpl();
FluentFuture<? extends CommitInfo> submitFuture = abstractDOMBrokerWriteTransactionTestImpl.commit();
- try {
- submitFuture.get();
- Assert.fail("TransactionCommitFailedException expected");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TransactionCommitFailedException);
- assertTrue(e.getCause().getCause() == thrown);
- abstractDOMBrokerWriteTransactionTestImpl.cancel();
- }
+ final var cause = assertThrows(ExecutionException.class, submitFuture::get).getCause();
+ assertTrue(cause instanceof TransactionCommitFailedException);
+ assertSame(thrown, cause.getCause());
+ abstractDOMBrokerWriteTransactionTestImpl.cancel();
}
@Test
= new AbstractDOMBrokerWriteTransactionTestImpl();
FluentFuture<? extends CommitInfo> submitFuture = abstractDOMBrokerWriteTransactionTestImpl.commit();
- try {
- submitFuture.get();
- Assert.fail("TransactionCommitFailedException expected");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TransactionCommitFailedException);
- assertTrue(e.getCause().getCause() == thrown);
- abstractDOMBrokerWriteTransactionTestImpl.cancel();
- }
+ final var cause = assertThrows(ExecutionException.class, submitFuture::get).getCause();
+ assertTrue(cause instanceof TransactionCommitFailedException);
+ assertSame(thrown, cause.getCause());
+ abstractDOMBrokerWriteTransactionTestImpl.cancel();
+ }
+
+ @Test
+ public void getSubtransactionStoreMismatch() {
+ final var testTx = new AbstractDOMBrokerWriteTransactionTestImpl(
+ Map.of(CONFIGURATION, txFactory, OPERATIONAL, txFactory));
+
+ assertEquals(domStoreWriteTransaction, testTx.getSubtransaction(CONFIGURATION));
+
+ final var exception = assertThrows(TransactionDatastoreMismatchException.class,
+ () -> testTx.getSubtransaction(OPERATIONAL));
+ assertEquals(CONFIGURATION, exception.expected());
+ assertEquals(OPERATIONAL, exception.encountered());
+ }
+
+ @Test
+ public void getSubtransactionStoreUndefined() {
+ final var testTx = new AbstractDOMBrokerWriteTransactionTestImpl(Map.of(OPERATIONAL, txFactory));
+
+ final var exception = assertThrows(IllegalArgumentException.class,
+ () -> testTx.getSubtransaction(CONFIGURATION));
+ assertEquals("CONFIGURATION is not supported", exception.getMessage());
}
}
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.mockito.InOrder;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension;
public class ConcurrentDOMDataBrokerTest {
private final DOMDataTreeWriteTransaction transaction = mock(DOMDataTreeWriteTransaction.class);
- private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
- private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class);
+ private final DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
private final ThreadPoolExecutor futureExecutor =
new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<>());
private ConcurrentDOMDataBroker coordinator;
final SettableFuture<Boolean> future = SettableFuture.create();
if (doAsync) {
new Thread(() -> {
- Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
- 10, TimeUnit.SECONDS);
+ Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue, 10, TimeUnit.SECONDS);
future.set(Boolean.TRUE);
}).start();
} else {
return future;
};
- doAnswer(asyncCanCommit).when(mockCohort1).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).commit();
+ doAnswer(asyncCanCommit).when(mockCohort).canCommit();
+ doReturn(immediateNullFluentFuture()).when(mockCohort).preCommit();
+ doReturn(immediateNullFluentFuture()).when(mockCohort).commit();
- doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort2).preCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort2).commit();
-
- ListenableFuture<? extends CommitInfo> future =
- coordinator.commit(transaction, Arrays.asList(mockCohort1, mockCohort2));
+ ListenableFuture<? extends CommitInfo> future = coordinator.commit(transaction, mockCohort);
final CountDownLatch doneLatch = new CountDownLatch(1);
final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
- InOrder inOrder = inOrder(mockCohort1, mockCohort2);
- inOrder.verify(mockCohort1).canCommit();
- inOrder.verify(mockCohort2).canCommit();
- inOrder.verify(mockCohort1).preCommit();
- inOrder.verify(mockCohort2).preCommit();
- inOrder.verify(mockCohort1).commit();
- inOrder.verify(mockCohort2).commit();
+ InOrder inOrder = inOrder(mockCohort);
+ inOrder.verify(mockCohort, times(1)).canCommit();
+ inOrder.verify(mockCohort, times(1)).preCommit();
+ inOrder.verify(mockCohort, times(1)).commit();
}
@Test
public void testSubmitWithNegativeCanCommitResponse() throws Exception {
- doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
-
- doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort2).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
- DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
- doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort3).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort3).abort();
-
- ListenableFuture<? extends CommitInfo> future = coordinator.commit(
- transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
+ doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort).canCommit();
+ doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
- assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3);
+ assertFailure(coordinator.commit(transaction, mockCohort), null, mockCohort);
}
private static void assertFailure(final ListenableFuture<?> future, final Exception expCause,
- final DOMStoreThreePhaseCommitCohort... mockCohorts) throws Exception {
+ final DOMStoreThreePhaseCommitCohort mockCohort) throws Exception {
try {
future.get(5, TimeUnit.SECONDS);
fail("Expected TransactionCommitFailedException");
if (expCause != null) {
assertSame("Expected cause", expCause.getClass(), tcf.getCause().getClass());
}
-
- InOrder inOrder = inOrder((Object[])mockCohorts);
- for (DOMStoreThreePhaseCommitCohort c: mockCohorts) {
- inOrder.verify(c).abort();
- }
+ verify(mockCohort, times(1)).abort();
} catch (TimeoutException e) {
throw e;
}
@Test
public void testSubmitWithCanCommitException() throws Exception {
- doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
-
- IllegalStateException cause = new IllegalStateException("mock");
- doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
- FluentFuture<? extends CommitInfo> future = coordinator.commit(
- transaction, Arrays.asList(mockCohort1, mockCohort2));
+ final Exception cause = new IllegalStateException("mock");
+ doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).canCommit();
+ doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
- assertFailure(future, cause, mockCohort1, mockCohort2);
- }
-
- @Test
- public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception {
- doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
- NoShardLeaderException rootCause = new NoShardLeaderException("mock");
- DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause);
- doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
- FluentFuture<? extends CommitInfo> future = coordinator.commit(
- transaction, Arrays.asList(mockCohort1, mockCohort2));
-
- assertFailure(future, cause, mockCohort1, mockCohort2);
+ assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort);
}
@Test
public void testSubmitWithPreCommitException() throws Exception {
- doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
-
- doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit();
- IllegalStateException cause = new IllegalStateException("mock");
- doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
- DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
- doReturn(immediateTrueFluentFuture()).when(mockCohort3).canCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2")))
- .when(mockCohort3).preCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort3).abort();
-
- FluentFuture<? extends CommitInfo> future = coordinator.commit(
- transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
+ doReturn(immediateTrueFluentFuture()).when(mockCohort).canCommit();
+ final IllegalStateException cause = new IllegalStateException("mock");
+ doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).preCommit();
+ doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
- assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
+ assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort);
}
@Test
public void testSubmitWithCommitException() throws Exception {
- doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).commit();
- doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
-
- doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort2).preCommit();
- IllegalStateException cause = new IllegalStateException("mock");
- doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit();
- doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
- DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
- doReturn(immediateTrueFluentFuture()).when(mockCohort3).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort3).preCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2")))
- .when(mockCohort3).commit();
- doReturn(immediateNullFluentFuture()).when(mockCohort3).abort();
-
- FluentFuture<? extends CommitInfo> future = coordinator.commit(
- transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
-
- assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
+ doReturn(immediateTrueFluentFuture()).when(mockCohort).canCommit();
+ doReturn(immediateNullFluentFuture()).when(mockCohort).preCommit();
+ final IllegalStateException cause = new IllegalStateException("mock");
+ doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).commit();
+ doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
+
+ assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort);
}
@Test
public void testSubmitWithAbortException() throws Exception {
- doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error")))
- .when(mockCohort1).abort();
+ final Exception canCommitCause = new IllegalStateException("canCommit error");
+ doReturn(Futures.immediateFailedFuture(canCommitCause)).when(mockCohort).canCommit();
+ final Exception abortCause = new IllegalStateException("abort error");
+ doReturn(Futures.immediateFailedFuture(abortCause)).when(mockCohort).abort();
- IllegalStateException cause = new IllegalStateException("mock canCommit error");
- doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
-
- FluentFuture<? extends CommitInfo> future = coordinator.commit(
- transaction, Arrays.asList(mockCohort1, mockCohort2));
-
- assertFailure(future, cause, mockCohort1, mockCohort2);
+ assertFailure(coordinator.commit(transaction, mockCohort), canCommitCause, mockCohort);
}
@Test
verify(configDomStore, never()).newReadWriteTransaction();
verify(operationalDomStore, times(1)).newReadWriteTransaction();
-
- dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
-
- verify(configDomStore, times(1)).newReadWriteTransaction();
- verify(operationalDomStore, times(1)).newReadWriteTransaction();
}
}
verify(configDomStore, never()).newWriteOnlyTransaction();
verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
-
- dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
-
- verify(configDomStore, times(1)).newWriteOnlyTransaction();
- verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
}
}
verify(configDomStore, never()).newReadOnlyTransaction();
verify(operationalDomStore, times(1)).newReadOnlyTransaction();
-
- dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty());
-
- verify(configDomStore, times(1)).newReadOnlyTransaction();
- verify(operationalDomStore, times(1)).newReadOnlyTransaction();
}
}
DOMStore configDomStore = mock(DOMStore.class);
DOMStore operationalDomStore = mock(DOMStore.class);
DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class);
- DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction();
doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready();
configDomStore), futureExecutor) {
@Override
public FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction writeTx,
- Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
- commitCohorts.addAll(cohorts);
+ DOMStoreThreePhaseCommitCohort cohort) {
+ commitCohorts.add(cohort);
latch.countDown();
- return super.commit(writeTx, cohorts);
+ return super.commit(writeTx, cohort);
}
}) {
DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
}
}
- @Test
- public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException {
- DOMStore configDomStore = mock(DOMStore.class);
- DOMStore operationalDomStore = mock(DOMStore.class);
- DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
- DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class);
- DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class);
- DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class);
-
- doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction();
- doReturn(configTransaction).when(configDomStore).newReadWriteTransaction();
-
- doReturn(mockCohortOperational).when(operationalTransaction).ready();
- doReturn(immediateFalseFluentFuture()).when(mockCohortOperational).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohortOperational).abort();
-
- doReturn(mockCohortConfig).when(configTransaction).ready();
- doReturn(immediateFalseFluentFuture()).when(mockCohortConfig).canCommit();
- doReturn(immediateNullFluentFuture()).when(mockCohortConfig).abort();
-
- final CountDownLatch latch = new CountDownLatch(1);
- final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList<>();
-
- try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
- LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
- configDomStore), futureExecutor) {
- @Override
- @SuppressWarnings("checkstyle:hiddenField")
- public FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction writeTx,
- Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
- commitCohorts.addAll(cohorts);
- latch.countDown();
- return super.commit(writeTx, cohorts);
- }
- }) {
- DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
-
- domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty(),
- mock(NormalizedNode.class));
- domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(),
- mock(NormalizedNode.class));
-
- domDataReadWriteTransaction.commit();
-
- assertTrue(latch.await(10, TimeUnit.SECONDS));
-
- assertTrue(commitCohorts.size() == 2);
- }
- }
-
@Test
public void testCreateTransactionChain() {
DOMStore domStore = mock(DOMStore.class);
assertNotNull(supportedExtensions.getInstance(DOMDataTreeChangeService.class));
DOMDataTreeCommitCohortRegistry cohortRegistry = supportedExtensions.getInstance(
- DOMDataTreeCommitCohortRegistry.class);
+ DOMDataTreeCommitCohortRegistry.class);
assertNotNull(cohortRegistry);
- DOMDataTreeCommitCohort mockCohort = mock(DOMDataTreeCommitCohort.class);
+ DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class);
DOMDataTreeIdentifier path = new DOMDataTreeIdentifier(
org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.empty());
- cohortRegistry.registerCommitCohort(path, mockCohort);
+ cohortRegistry.registerCommitCohort(path, cohort);
- verify(mockConfigStore).registerCommitCohort(path, mockCohort);
+ verify(mockConfigStore).registerCommitCohort(path, cohort);
}
}
}