From: Tomas Cere Date: Fri, 9 Sep 2016 15:53:20 +0000 (+0200) Subject: Fix InMemory shard transaction chaining. X-Git-Tag: release/boron-sr1~29 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=mdsal.git;a=commitdiff_plain;h=dca009bba2d4ceb2e13537f3ac6f9a5f1b05302f Fix InMemory shard transaction chaining. In case we had multiple transactions going after each other quickly the next transaction might not observe the state of the previous transaction yet. Fix this by reusing the DataTreeModification of the previous transaction. Also introduce state transitions into InMemoryDOMDataTreeProducer so it behaves more like a transaction chain. Change-Id: I96c746c1b91ac9f6b87152dca612094c73c23387 Signed-off-by: Tomas Cere (cherry picked from commit 7d8e59521efbfadd55de34f645db8a8692de6564) --- diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java index 837871c031..3ae355a071 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java @@ -262,10 +262,10 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { } } - void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx, final Void result) { + void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx) { LOG.debug("Transaction {} completed successfully", tx.getIdentifier()); - tx.onTransactionSuccess(result); + tx.onTransactionSuccess(null); processNextTransaction(tx); } @@ -283,7 +283,7 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { } } - private void processNextTransaction(final ShardedDOMDataTreeWriteTransaction tx) { + private synchronized void processNextTransaction(final ShardedDOMDataTreeWriteTransaction tx) { final boolean wasLast = LAST_UPDATER.compareAndSet(this, tx, null); if (wasLast) { processCurrentTransaction(); diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java index 24b640501f..b2deac6179 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; @@ -133,7 +134,7 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware } CheckedFuture doSubmit( - BiConsumer success, + Consumer success, BiConsumer failure) { final Set txns = ImmutableSet.copyOf(idToTransaction.values()); @@ -148,14 +149,14 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware Futures.addCallback(listListenableFuture, new FutureCallback>() { @Override public void onSuccess(final List result) { + success.accept(ShardedDOMDataTreeWriteTransaction.this); ret.set(null); - success.accept(ShardedDOMDataTreeWriteTransaction.this, null); } @Override public void onFailure(final Throwable exp) { - ret.setException(exp); failure.accept(ShardedDOMDataTreeWriteTransaction.this, exp); + ret.setException(exp); } }); diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java index 058769544c..6e5a41e7d1 100644 --- a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java +++ b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java @@ -229,15 +229,17 @@ public class ShardedDOMDataTreeTest { tx.submit().checkedGet(); final ArrayList> futures = new ArrayList<>(); - final Collection innerListMapEntries = createInnerListMapEntries(1000, "run-1"); - for (final MapEntryNode innerListMapEntry : innerListMapEntries) { - final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false); - final DOMDataTreeWriteCursor cursor1 = tx1.createCursor( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, - oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME)))); - cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry); - cursor1.close(); - futures.add(tx1.submit()); + for (int i = 0; i < 1000; i++) { + final Collection innerListMapEntries = createInnerListMapEntries(1000, "run-" + i); + for (final MapEntryNode innerListMapEntry : innerListMapEntries) { + final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false); + final DOMDataTreeWriteCursor cursor1 = tx1.createCursor( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, + oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME)))); + cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry); + cursor1.close(); + futures.add(tx1.submit()); + } } futures.get(futures.size() - 1).checkedGet(); diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java index 031d9b297c..98ad468a4b 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java @@ -32,6 +32,7 @@ import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -196,13 +197,22 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha return ret; } - InmemoryDOMDataTreeShardWriteTransaction createTransaction( - final InmemoryDOMDataTreeShardWriteTransaction previousTx) { - // FIXME: implement this - throw new UnsupportedOperationException(); + DataTreeSnapshot takeSnapshot() { + return dataTree.takeSnapshot(); } - InmemoryDOMDataTreeShardWriteTransaction createTransaction(final Collection prefixes) { + InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId, + final InMemoryDOMDataTreeShardProducer producer, + final Collection prefixes, + final DataTreeSnapshot snapshot) { + + return createTxForSnapshot(producer, prefixes, (CursorAwareDataTreeSnapshot) snapshot); + } + + private InmemoryDOMDataTreeShardWriteTransaction createTxForSnapshot( + final InMemoryDOMDataTreeShardProducer producer, + final Collection prefixes, + final CursorAwareDataTreeSnapshot snapshot) { final Map affectedSubshards = new HashMap<>(); for (final DOMDataTreeIdentifier producerPrefix : prefixes) { @@ -226,8 +236,7 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha } } - final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix, - (CursorAwareDataTreeSnapshot) dataTree.takeSnapshot()); + final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix, snapshot); final ShardDataModificationBuilder builder = new ShardDataModificationBuilder(rootContext); for (final SubshardProducerSpecification spec : affectedSubshards.values()) { final ForeignShardModificationContext foreignContext = @@ -236,6 +245,8 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha builder.addSubshard(spec.getPrefix(), foreignContext); } - return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree, shardChangePublisher, executor); + return new InmemoryDOMDataTreeShardWriteTransaction(producer, builder.build(), + dataTree, shardChangePublisher, executor); } + } diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducer.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducer.java index 6c0d2775c6..dcd867bf3c 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducer.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducer.java @@ -10,32 +10,203 @@ package org.opendaylight.mdsal.dom.store.inmemory; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import java.util.AbstractMap.SimpleEntry; import java.util.Collection; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -final class InMemoryDOMDataTreeShardProducer implements DOMDataTreeShardProducer { +class InMemoryDOMDataTreeShardProducer implements DOMDataTreeShardProducer { + + private abstract static class State { + /** + * Allocate a new snapshot. + * + * @return A new snapshot + */ + protected abstract DataTreeSnapshot getSnapshot(Object transactionId); + } + + private static final class Idle extends State { + private final InMemoryDOMDataTreeShardProducer producer; + + Idle(final InMemoryDOMDataTreeShardProducer producer) { + this.producer = Preconditions.checkNotNull(producer); + } + + @Override + protected DataTreeSnapshot getSnapshot(Object transactionId) { + return producer.takeSnapshot(); + } + } + + /** + * We have a transaction out there. + */ + private static final class Allocated extends State { + private static final AtomicReferenceFieldUpdater SNAPSHOT_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot"); + private final InmemoryDOMDataTreeShardWriteTransaction transaction; + private volatile DataTreeSnapshot snapshot; + + Allocated(final InmemoryDOMDataTreeShardWriteTransaction transaction) { + this.transaction = Preconditions.checkNotNull(transaction); + } + + public InmemoryDOMDataTreeShardWriteTransaction getTransaction() { + return transaction; + } + + @Override + protected DataTreeSnapshot getSnapshot(Object transactionId) { + final DataTreeSnapshot ret = snapshot; + Preconditions.checkState(ret != null, + "Could not get snapshot for transaction %s - previous transaction %s is not ready yet", + transactionId, transaction.getIdentifier()); + return ret; + } + + void setSnapshot(final DataTreeSnapshot snapshot) { + final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot); + Preconditions.checkState(success, "Transaction %s has already been marked as ready", + transaction.getIdentifier()); + } + } + + /** + * Producer is logically shut down, no further allocation allowed. + */ + private static final class Shutdown extends State { + private final String message; + + Shutdown(final String message) { + this.message = Preconditions.checkNotNull(message); + } + + @Override + protected DataTreeSnapshot getSnapshot(Object transactionId) { + throw new IllegalStateException(message); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class); + private static final AtomicLong COUNTER = new AtomicLong(); private final InMemoryDOMDataTreeShard parentShard; private final Collection prefixes; - private InmemoryDOMDataTreeShardWriteTransaction currentTx; - private InmemoryDOMDataTreeShardWriteTransaction lastSubmittedTx; + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(InMemoryDOMDataTreeShardProducer.class, State.class, "state"); + private final Idle idleState = new Idle(this); + private volatile State state; InMemoryDOMDataTreeShardProducer(final InMemoryDOMDataTreeShard parentShard, final Collection prefixes) { this.parentShard = Preconditions.checkNotNull(parentShard); this.prefixes = ImmutableSet.copyOf(prefixes); + state = idleState; } @Override - public InmemoryDOMDataTreeShardWriteTransaction createTransaction() { -// Preconditions.checkState(currentTx == null || currentTx.isFinished(), "Previous transaction not finished yet."); - if (lastSubmittedTx != null) { - currentTx = parentShard.createTransaction(lastSubmittedTx); + public synchronized InmemoryDOMDataTreeShardWriteTransaction createTransaction() { + Entry entry; + InmemoryDOMDataTreeShardWriteTransaction ret; + String transactionId = nextIdentifier(); + + do { + entry = getSnapshot(transactionId); + ret = parentShard.createTransaction(transactionId, this, prefixes, entry.getValue()); + } while (!recordTransaction(entry.getKey(), ret)); + + return ret; + } + + synchronized void transactionReady(final InmemoryDOMDataTreeShardWriteTransaction tx, + final DataTreeModification modification) { + final State localState = state; + LOG.debug("Transaction was readied {}, current state {}", tx.getIdentifier(), localState); + + if (localState instanceof Allocated) { + final Allocated allocated = (Allocated) localState; + final InmemoryDOMDataTreeShardWriteTransaction transaction = allocated.getTransaction(); + Preconditions.checkState(tx.equals(transaction), + "Mis-ordered ready transaction %s last allocated was %s", tx, transaction); + allocated.setSnapshot(modification); } else { - currentTx = parentShard.createTransaction(prefixes); + LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState); } - return currentTx; + } + + /** + * Notify the base logic that a previously-submitted transaction has been committed successfully. + * + * @param transaction Transaction which completed successfully. + */ + synchronized void onTransactionCommited(final InmemoryDOMDataTreeShardWriteTransaction transaction) { + // If the committed transaction was the one we allocated last, + // we clear it and the ready snapshot, so the next transaction + // allocated refers to the data tree directly. + final State localState = state; + LOG.debug("Transaction {} commit done, current state {}", transaction.getIdentifier(), localState); + + if (!(localState instanceof Allocated)) { + // This can legally happen if the chain is shut down before the transaction was committed + // by the backend. + LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState); + return; + } + + final Allocated allocated = (Allocated) localState; + final InmemoryDOMDataTreeShardWriteTransaction tx = allocated.getTransaction(); + if (!tx.equals(transaction)) { + LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated); + return; + } + + if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) { + LOG.debug("Producer {} has already transitioned from {} to {}, not making it idle", this, + localState, state); + } + } + + synchronized void transactionAborted(final InmemoryDOMDataTreeShardWriteTransaction tx) { + final State localState = state; + if (localState instanceof Allocated) { + final Allocated allocated = (Allocated)localState; + if (allocated.getTransaction().equals(tx)) { + final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState); + if (!success) { + LOG.warn("Transaction {} aborted, but producer {} state already transitioned from {} to {}", + tx, this, localState, state); + } + } + } + } + + + private Entry getSnapshot(String transactionId) { + final State localState = state; + return new SimpleEntry<>(localState, localState.getSnapshot(transactionId)); + } + + private boolean recordTransaction(final State expected, + final InmemoryDOMDataTreeShardWriteTransaction transaction) { + final State state = new Allocated(transaction); + return STATE_UPDATER.compareAndSet(this, expected, state); + } + + private String nextIdentifier() { + return "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement(); + + } + + DataTreeSnapshot takeSnapshot() { + return parentShard.takeSnapshot(); } @Override diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java index 2ef7ebb28b..d3265fdab3 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java @@ -16,10 +16,12 @@ import com.google.common.util.concurrent.ListeningExecutorService; import java.util.ArrayList; import java.util.Iterator; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -28,7 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction { +class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable { private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class); @@ -75,26 +77,39 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT } } + private static final AtomicLong COUNTER = new AtomicLong(); + private final ArrayList cohorts = new ArrayList<>(); private final InMemoryDOMDataTreeShardChangePublisher changePublisher; + private final InMemoryDOMDataTreeShardProducer producer; private final ShardDataModification modification; private final ListeningExecutorService executor; private final DataTree rootShardDataTree; + private final String identifier; private DataTreeModification rootModification = null; private DOMDataTreeWriteCursor cursor; private boolean finished = false; - InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, + InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer, + final ShardDataModification root, final DataTree rootShardDataTree, final InMemoryDOMDataTreeShardChangePublisher changePublisher, final ListeningExecutorService executor) { + this.producer = producer; this.modification = Preconditions.checkNotNull(root); this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree); this.changePublisher = Preconditions.checkNotNull(changePublisher); + this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement(); + LOG.debug("Shard transaction{} created", identifier); this.executor = executor; } + @Override + public String getIdentifier() { + return identifier; + } + private DOMDataTreeWriteCursor getCursor() { if (cursor == null) { cursor = new ShardDataModificationCursor(modification, this); @@ -139,6 +154,7 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT if (cursor != null) { cursor.close(); } + producer.transactionAborted(this); finished = true; } @@ -161,6 +177,7 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT LOG.debug("Readying open transaction on shard {}", modification.getPrefix()); rootModification = modification.seal(); + producer.transactionReady(this, rootModification); cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort( rootShardDataTree, rootModification, changePublisher)); for (final Entry entry : @@ -178,7 +195,7 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet."); final ListenableFuture submit = executor.submit(new ShardSubmitCoordinationTask( - modification.getPrefix(), cohorts)); + modification.getPrefix(), cohorts, this)); return submit; } @@ -206,10 +223,19 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT LOG.debug("Commit open transaction on shard {}", modification.getPrefix()); final ListenableFuture submit = executor.submit(new ShardCommitCoordinationTask( - modification.getPrefix(), cohorts)); + modification.getPrefix(), cohorts, this)); return submit; } + DataTreeModification getRootModification() { + Preconditions.checkNotNull(rootModification, "Transaction wasn't sealed yet"); + return rootModification; + } + + void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) { + producer.onTransactionCommited(tx); + } + @Override public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) { Preconditions.checkState(!finished, "Transaction is finished/closed already."); diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java index dbbd0abdb2..a6adbb675f 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java @@ -30,11 +30,14 @@ public class ShardCommitCoordinationTask implements Callable { private final DOMDataTreeIdentifier rootShardPrefix; private final Collection cohorts; + private InmemoryDOMDataTreeShardWriteTransaction transaction; public ShardCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix, - final Collection cohorts) { + final Collection cohorts, + final InmemoryDOMDataTreeShardWriteTransaction transaction) { this.rootShardPrefix = Preconditions.checkNotNull(rootShardPrefix); this.cohorts = Preconditions.checkNotNull(cohorts); + this.transaction = Preconditions.checkNotNull(transaction); } @Override @@ -43,6 +46,7 @@ public class ShardCommitCoordinationTask implements Callable { try { LOG.debug("Shard {}, commit started", rootShardPrefix); commitBlocking(); + transaction.transactionCommited(transaction); return null; } catch (final TransactionCommitFailedException e) { diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java index c7b0ee6adc..6034e0b6b0 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java @@ -31,29 +31,34 @@ public class ShardSubmitCoordinationTask implements Callable { private final ShardCanCommitCoordinationTask canCommitCoordinationTask; private final ShardPreCommitCoordinationTask preCommitCoordinationTask; private final ShardCommitCoordinationTask commitCoordinationTask; + private final InmemoryDOMDataTreeShardWriteTransaction transaction; public ShardSubmitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix, - final Collection cohorts) { + final Collection cohorts, + final InmemoryDOMDataTreeShardWriteTransaction transaction) { this.rootShardPrefix = Preconditions.checkNotNull(rootShardPrefix); + this.transaction = transaction; canCommitCoordinationTask = new ShardCanCommitCoordinationTask(rootShardPrefix, cohorts); preCommitCoordinationTask = new ShardPreCommitCoordinationTask(rootShardPrefix, cohorts); - commitCoordinationTask = new ShardCommitCoordinationTask(rootShardPrefix, cohorts); + commitCoordinationTask = new ShardCommitCoordinationTask(rootShardPrefix, cohorts, transaction); } @Override public Void call() throws TransactionCommitFailedException { - LOG.debug("Shard {}, CanCommit started", rootShardPrefix); + LOG.debug("Shard {}, tx{} CanCommit started", transaction.getIdentifier(), rootShardPrefix); canCommitCoordinationTask.canCommitBlocking(); - LOG.debug("Shard {}, PreCommit started", rootShardPrefix); + LOG.debug("Shard {}, tx{} PreCommit started", transaction.getIdentifier(), rootShardPrefix); preCommitCoordinationTask.preCommitBlocking(); - LOG.debug("Shard {}, commit started", rootShardPrefix); + LOG.debug("Shard {}, tx{} commit started", transaction.getIdentifier(), rootShardPrefix); commitCoordinationTask.commitBlocking(); + transaction.transactionCommited(transaction); + return null; } } diff --git a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducerTest.java b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducerTest.java index 7292116412..0115866eb4 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducerTest.java +++ b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducerTest.java @@ -8,6 +8,7 @@ package org.opendaylight.mdsal.dom.store.inmemory; import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollectionOf; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -18,6 +19,7 @@ import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.resetMocks; import com.google.common.collect.ImmutableSet; import org.junit.Test; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; public class InMemoryDOMDataTreeShardProducerTest { @@ -26,15 +28,23 @@ public class InMemoryDOMDataTreeShardProducerTest { final InMemoryDOMDataTreeShard inMemoryDOMDataTreeShard = mock(InMemoryDOMDataTreeShard.class); final InmemoryDOMDataTreeShardWriteTransaction inmemoryDOMDataTreeShardWriteTransaction = mock(InmemoryDOMDataTreeShardWriteTransaction.class); + final CursorAwareDataTreeSnapshot snapshot = mock(CursorAwareDataTreeSnapshot.class); + doReturn(snapshot).when(inMemoryDOMDataTreeShard).takeSnapshot(); + doReturn(inmemoryDOMDataTreeShardWriteTransaction).when(inMemoryDOMDataTreeShard) - .createTransaction(anyCollectionOf((DOMDataTreeIdentifier.class))); + .createTransaction(any(String.class), any(InMemoryDOMDataTreeShardProducer.class), + anyCollectionOf((DOMDataTreeIdentifier.class)), any(CursorAwareDataTreeSnapshot.class)); final InMemoryDOMDataTreeShardProducer inMemoryDOMDataTreeShardProducer = new InMemoryDOMDataTreeShardProducer(inMemoryDOMDataTreeShard, ImmutableSet.of(DOM_DATA_TREE_IDENTIFIER)); assertNotNull(inMemoryDOMDataTreeShardProducer.createTransaction()); - verify(inMemoryDOMDataTreeShard).createTransaction(anyCollectionOf(DOMDataTreeIdentifier.class)); + verify(inMemoryDOMDataTreeShard).createTransaction( + any(String.class), + any(InMemoryDOMDataTreeShardProducer.class), + anyCollectionOf(DOMDataTreeIdentifier.class), + any(CursorAwareDataTreeSnapshot.class)); resetMocks(); } } \ No newline at end of file diff --git a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java index add31fec08..f9b4d4ee04 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java +++ b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java @@ -32,6 +32,8 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; public class InMemoryDOMDataTreeShardTest { @@ -57,8 +59,10 @@ public class InMemoryDOMDataTreeShardTest { final Collection prefixes = ImmutableList.of(DOM_DATA_TREE_IDENTIFIER); assertEquals(prefixes.toString(), inMemoryDOMDataTreeShard.createProducer(prefixes).getPrefixes().toString()); + final InMemoryDOMDataTreeShardProducer mockProducer = mock(InMemoryDOMDataTreeShardProducer.class); + inMemoryDOMDataTreeShard.onGlobalContextUpdated(createTestContext()); - inMemoryDOMDataTreeShard.createTransaction(prefixes); + inMemoryDOMDataTreeShard.createTransaction("", mockProducer, prefixes, mock(CursorAwareDataTreeSnapshot.class)); final DOMDataTreeChangeListener domDataTreeChangeListener = mock(DOMDataTreeChangeListener.class); final ListenerRegistration listenerRegistration = mock(ListenerRegistration.class); @@ -71,7 +75,7 @@ public class InMemoryDOMDataTreeShardTest { assertFalse(inMemoryDOMDataTreeShard.getChildShards().containsKey(DOM_DATA_TREE_IDENTIFIER)); } - @Test(expected = UnsupportedOperationException.class) + @Test public void createTransactionWithException() throws Exception { final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); @@ -79,11 +83,15 @@ public class InMemoryDOMDataTreeShardTest { final InMemoryDOMDataTreeShard inMemoryDOMDataTreeShard = InMemoryDOMDataTreeShard.create(domDataTreeIdentifier, MoreExecutors.newDirectExecutorService(), 1); + final CursorAwareDataTreeModification dataTreeModification = mock(CursorAwareDataTreeModification.class); final InmemoryDOMDataTreeShardWriteTransaction inmemoryDOMDataTreeShardWriteTransaction = mock(InmemoryDOMDataTreeShardWriteTransaction.class); + doReturn(dataTreeModification).when(inmemoryDOMDataTreeShardWriteTransaction).getRootModification(); + final InMemoryDOMDataTreeShardProducer mockProducer = mock(InMemoryDOMDataTreeShardProducer.class); + final Collection prefixes = ImmutableList.of(DOM_DATA_TREE_IDENTIFIER); - inMemoryDOMDataTreeShard.createTransaction(inmemoryDOMDataTreeShardWriteTransaction); + inMemoryDOMDataTreeShard.createTransaction("", mockProducer, prefixes, mock(CursorAwareDataTreeSnapshot.class)); } @After diff --git a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohortTest.java b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohortTest.java index b73d1e3047..345ddabbe1 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohortTest.java +++ b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohortTest.java @@ -45,7 +45,7 @@ public class InMemoryDOMDataTreeShardThreePhaseCommitCohortTest { private static final InMemoryDOMDataTreeShardThreePhaseCommitCohort IN_MEMORY_DOM_DATA_TREE_SHARD_THREE_PHASE_COMMIT_COHORT = new InMemoryDOMDataTreeShardThreePhaseCommitCohort(DATA_TREE, DATA_TREE_MODIFICATION, - IN_MEMORY_DOM_DATA_TREE_SHARD_CHANGE_PUBLISHER); + IN_MEMORY_DOM_DATA_TREE_SHARD_CHANGE_PUBLISHER); @Before public void setUp() throws Exception { diff --git a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransactionTest.java b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransactionTest.java index 7d657e62aa..08eb6adfb2 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransactionTest.java +++ b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransactionTest.java @@ -58,6 +58,7 @@ public class InmemoryDOMDataTreeShardWriteTransactionTest { new ChildShardContext(DOM_DATA_TREE_IDENTIFIER, READABLE_WRITEABLE_DOM_DATA_TREE_SHARD); private static final Map CHILD_SHARDS = ImmutableMap.of(DOM_DATA_TREE_IDENTIFIER, CHILD_SHARD_CONTEXT); + private InMemoryDOMDataTreeShardProducer mockProducer; @Before public void setUp() throws Exception { @@ -81,9 +82,13 @@ public class InmemoryDOMDataTreeShardWriteTransactionTest { final InMemoryDOMDataTreeShardChangePublisher inMemoryDOMDataTreeShardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(MoreExecutors.newDirectExecutorService(), 1, DATA_TREE, YANG_INSTANCE_IDENTIFIER, CHILD_SHARDS); + mockProducer = mock(InMemoryDOMDataTreeShardProducer.class); + doNothing().when(mockProducer).transactionReady(any(), any()); + doNothing().when(mockProducer).onTransactionCommited(any()); + doNothing().when(mockProducer).transactionAborted(any()); inmemoryDOMDataTreeShardWriteTransaction = - new InmemoryDOMDataTreeShardWriteTransaction(shardDataModification, DATA_TREE, + new InmemoryDOMDataTreeShardWriteTransaction(mockProducer, shardDataModification, DATA_TREE, inMemoryDOMDataTreeShardChangePublisher, MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())); } diff --git a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTaskTest.java b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTaskTest.java index 24a2047b53..13f76b425e 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTaskTest.java +++ b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTaskTest.java @@ -7,8 +7,11 @@ */ package org.opendaylight.mdsal.dom.store.inmemory; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.COHORTS; import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.DOM_DATA_TREE_IDENTIFIER; @@ -17,11 +20,20 @@ import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.LISTENABLE_FUT import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.resetMocks; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; public class ShardCommitCoordinationTaskTest { + final InmemoryDOMDataTreeShardWriteTransaction mockTx = mock(InmemoryDOMDataTreeShardWriteTransaction.class); + + @Before + public void setUp() throws Exception { + doReturn("MockedTx").when(mockTx).toString(); + doNothing().when(mockTx).transactionCommited(any()); + } + @Test public void basicTest() throws Exception { doReturn(Void.TYPE).when(LISTENABLE_FUTURE).get(); @@ -30,7 +42,7 @@ public class ShardCommitCoordinationTaskTest { COHORTS.add(DOM_STORE_THREE_PHASE_COMMIT_COHORT); ShardCommitCoordinationTask shardCommitCoordinationTask = - new ShardCommitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, COHORTS); + new ShardCommitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, COHORTS, mockTx); shardCommitCoordinationTask.call(); verify(DOM_STORE_THREE_PHASE_COMMIT_COHORT).commit(); @@ -43,7 +55,7 @@ public class ShardCommitCoordinationTaskTest { COHORTS.add(DOM_STORE_THREE_PHASE_COMMIT_COHORT); ShardCommitCoordinationTask shardCommitCoordinationTask = - new ShardCommitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, COHORTS); + new ShardCommitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, COHORTS, mockTx); shardCommitCoordinationTask.call(); } diff --git a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTaskTest.java b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTaskTest.java index 1352ed1d9c..ccbf0ba6b7 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTaskTest.java +++ b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTaskTest.java @@ -7,21 +7,33 @@ */ package org.opendaylight.mdsal.dom.store.inmemory; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.DOM_DATA_TREE_IDENTIFIER; import java.lang.reflect.Field; import java.util.Collections; +import org.junit.Before; import org.junit.Test; public class ShardSubmitCoordinationTaskTest { + private final InmemoryDOMDataTreeShardWriteTransaction tx = mock(InmemoryDOMDataTreeShardWriteTransaction.class); + + @Before + public void setUp() throws Exception { + doReturn("TestTx").when(tx).getIdentifier(); + doReturn("TestTx").when(tx).toString(); + doNothing().when(tx).transactionCommited(any()); + } + @Test public void basicTest() throws Exception { final ShardSubmitCoordinationTask shardSubmitCoordinationTask = - new ShardSubmitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, Collections.EMPTY_SET); + new ShardSubmitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, Collections.EMPTY_SET, tx); final ShardCanCommitCoordinationTask canCommitCoordinationTask = mock(ShardCanCommitCoordinationTask.class); doNothing().when(canCommitCoordinationTask).canCommitBlocking();