Fix InMemory shard transaction chaining. 30/45830/1
authorTomas Cere <tcere@cisco.com>
Fri, 9 Sep 2016 15:53:20 +0000 (17:53 +0200)
committerRobert Varga <rovarga@cisco.com>
Mon, 19 Sep 2016 15:07:24 +0000 (17:07 +0200)
In case we had multiple transactions going after each other quickly
the next transaction might not observe the state of the previous transaction yet.
Fix this by reusing the DataTreeModification of the previous transaction.
Also introduce state transitions into InMemoryDOMDataTreeProducer so it
behaves more like a transaction chain.

Change-Id: I96c746c1b91ac9f6b87152dca612094c73c23387
Signed-off-by: Tomas Cere <tcere@cisco.com>
(cherry picked from commit 7d8e59521efbfadd55de34f645db8a8692de6564)

14 files changed:
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java
dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducer.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java
dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardProducerTest.java
dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java
dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohortTest.java
dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransactionTest.java
dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTaskTest.java
dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTaskTest.java

index 837871c03156addd19e4d5dac364fe38ff794ac8..3ae355a071f414227281dc9ebeeed23a4311d1dc 100644 (file)
@@ -262,10 +262,10 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer {
         }
     }
 
-    void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx, final Void result) {
+    void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx) {
         LOG.debug("Transaction {} completed successfully", tx.getIdentifier());
 
-        tx.onTransactionSuccess(result);
+        tx.onTransactionSuccess(null);
         processNextTransaction(tx);
     }
 
@@ -283,7 +283,7 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer {
         }
     }
 
-    private void processNextTransaction(final ShardedDOMDataTreeWriteTransaction tx) {
+    private synchronized void processNextTransaction(final ShardedDOMDataTreeWriteTransaction tx) {
         final boolean wasLast = LAST_UPDATER.compareAndSet(this, tx, null);
         if (wasLast) {
             processCurrentTransaction();
index 24b640501f01cc0bc48d28483070f48e0b384a42..b2deac6179fa5562a384439562f5a5d7244fb0bf 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
@@ -133,7 +134,7 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware
     }
 
     CheckedFuture<Void, TransactionCommitFailedException> doSubmit(
-            BiConsumer<ShardedDOMDataTreeWriteTransaction, Void> success,
+            Consumer<ShardedDOMDataTreeWriteTransaction> success,
             BiConsumer<ShardedDOMDataTreeWriteTransaction, Throwable> failure) {
 
         final Set<DOMDataTreeShardWriteTransaction> txns = ImmutableSet.copyOf(idToTransaction.values());
@@ -148,14 +149,14 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware
         Futures.addCallback(listListenableFuture, new FutureCallback<List<Void>>() {
             @Override
             public void onSuccess(final List<Void> result) {
+                success.accept(ShardedDOMDataTreeWriteTransaction.this);
                 ret.set(null);
-                success.accept(ShardedDOMDataTreeWriteTransaction.this, null);
             }
 
             @Override
             public void onFailure(final Throwable exp) {
-                ret.setException(exp);
                 failure.accept(ShardedDOMDataTreeWriteTransaction.this, exp);
+                ret.setException(exp);
             }
         });
 
index 058769544c0c0ed951192f904973af188ddbf4ed..6e5a41e7d17f442c6085790657a1a9d38610da7a 100644 (file)
@@ -229,15 +229,17 @@ public class ShardedDOMDataTreeTest {
         tx.submit().checkedGet();
 
         final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
-        final Collection<MapEntryNode> innerListMapEntries = createInnerListMapEntries(1000, "run-1");
-        for (final MapEntryNode innerListMapEntry : innerListMapEntries) {
-            final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false);
-            final DOMDataTreeWriteCursor cursor1 = tx1.createCursor(
-                    new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
-                            oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME))));
-            cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry);
-            cursor1.close();
-            futures.add(tx1.submit());
+        for (int i = 0; i < 1000; i++) {
+            final Collection<MapEntryNode> innerListMapEntries = createInnerListMapEntries(1000, "run-" + i);
+            for (final MapEntryNode innerListMapEntry : innerListMapEntries) {
+                final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false);
+                final DOMDataTreeWriteCursor cursor1 = tx1.createCursor(
+                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+                                oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME))));
+                cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry);
+                cursor1.close();
+                futures.add(tx1.submit());
+            }
         }
 
         futures.get(futures.size() - 1).checkedGet();
index 031d9b297cc739a465e1d6f2e8e3b11eb6e70157..98ad468a4bb6bfc14c69fbd7105324f17b08a9f1 100644 (file)
@@ -32,6 +32,7 @@ import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -196,13 +197,22 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
         return ret;
     }
 
-    InmemoryDOMDataTreeShardWriteTransaction createTransaction(
-            final InmemoryDOMDataTreeShardWriteTransaction previousTx) {
-        // FIXME: implement this
-        throw new UnsupportedOperationException();
+    DataTreeSnapshot takeSnapshot() {
+        return dataTree.takeSnapshot();
     }
 
-    InmemoryDOMDataTreeShardWriteTransaction createTransaction(final Collection<DOMDataTreeIdentifier> prefixes) {
+    InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId,
+                                                               final InMemoryDOMDataTreeShardProducer producer,
+                                                               final Collection<DOMDataTreeIdentifier> prefixes,
+                                                               final DataTreeSnapshot snapshot) {
+
+        return createTxForSnapshot(producer, prefixes, (CursorAwareDataTreeSnapshot) snapshot);
+    }
+
+    private InmemoryDOMDataTreeShardWriteTransaction createTxForSnapshot(
+            final InMemoryDOMDataTreeShardProducer producer,
+            final Collection<DOMDataTreeIdentifier> prefixes,
+            final CursorAwareDataTreeSnapshot snapshot) {
 
         final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
         for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
@@ -226,8 +236,7 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
             }
         }
 
-        final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix,
-                (CursorAwareDataTreeSnapshot) dataTree.takeSnapshot());
+        final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix, snapshot);
         final ShardDataModificationBuilder builder = new ShardDataModificationBuilder(rootContext);
         for (final SubshardProducerSpecification spec : affectedSubshards.values()) {
             final ForeignShardModificationContext foreignContext =
@@ -236,6 +245,8 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
             builder.addSubshard(spec.getPrefix(), foreignContext);
         }
 
-        return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree, shardChangePublisher, executor);
+        return new InmemoryDOMDataTreeShardWriteTransaction(producer, builder.build(),
+                dataTree, shardChangePublisher, executor);
     }
+
 }
index 6c0d2775c6c694299ed21edad103b6a5866f7456..dcd867bf3c9dc069b15566f8275a1cbc8cf07222 100644 (file)
@@ -10,32 +10,203 @@ package org.opendaylight.mdsal.dom.store.inmemory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-final class InMemoryDOMDataTreeShardProducer implements DOMDataTreeShardProducer {
+class InMemoryDOMDataTreeShardProducer implements DOMDataTreeShardProducer {
+
+    private abstract static class State {
+        /**
+         * Allocate a new snapshot.
+         *
+         * @return A new snapshot
+         */
+        protected abstract DataTreeSnapshot getSnapshot(Object transactionId);
+    }
+
+    private static final class Idle extends State {
+        private final InMemoryDOMDataTreeShardProducer producer;
+
+        Idle(final InMemoryDOMDataTreeShardProducer producer) {
+            this.producer = Preconditions.checkNotNull(producer);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot(Object transactionId) {
+            return producer.takeSnapshot();
+        }
+    }
+
+    /**
+     * We have a transaction out there.
+     */
+    private static final class Allocated extends State {
+        private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
+        private final InmemoryDOMDataTreeShardWriteTransaction transaction;
+        private volatile DataTreeSnapshot snapshot;
+
+        Allocated(final InmemoryDOMDataTreeShardWriteTransaction transaction) {
+            this.transaction = Preconditions.checkNotNull(transaction);
+        }
+
+        public InmemoryDOMDataTreeShardWriteTransaction getTransaction() {
+            return transaction;
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot(Object transactionId) {
+            final DataTreeSnapshot ret = snapshot;
+            Preconditions.checkState(ret != null,
+                    "Could not get snapshot for transaction %s - previous transaction %s is not ready yet",
+                    transactionId, transaction.getIdentifier());
+            return ret;
+        }
+
+        void setSnapshot(final DataTreeSnapshot snapshot) {
+            final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
+            Preconditions.checkState(success, "Transaction %s has already been marked as ready",
+                    transaction.getIdentifier());
+        }
+    }
+
+    /**
+     * Producer is logically shut down, no further allocation allowed.
+     */
+    private static final class Shutdown extends State {
+        private final String message;
+
+        Shutdown(final String message) {
+            this.message = Preconditions.checkNotNull(message);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot(Object transactionId) {
+            throw new IllegalStateException(message);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class);
+    private static final AtomicLong COUNTER = new AtomicLong();
 
     private final InMemoryDOMDataTreeShard parentShard;
     private final Collection<DOMDataTreeIdentifier> prefixes;
 
-    private InmemoryDOMDataTreeShardWriteTransaction currentTx;
-    private InmemoryDOMDataTreeShardWriteTransaction lastSubmittedTx;
+    private static final AtomicReferenceFieldUpdater<InMemoryDOMDataTreeShardProducer, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(InMemoryDOMDataTreeShardProducer.class, State.class, "state");
+    private final Idle idleState = new Idle(this);
+    private volatile State state;
 
     InMemoryDOMDataTreeShardProducer(final InMemoryDOMDataTreeShard parentShard,
             final Collection<DOMDataTreeIdentifier> prefixes) {
         this.parentShard = Preconditions.checkNotNull(parentShard);
         this.prefixes = ImmutableSet.copyOf(prefixes);
+        state = idleState;
     }
 
     @Override
-    public InmemoryDOMDataTreeShardWriteTransaction createTransaction() {
-//      Preconditions.checkState(currentTx == null || currentTx.isFinished(), "Previous transaction not finished yet.");
-        if (lastSubmittedTx != null) {
-            currentTx = parentShard.createTransaction(lastSubmittedTx);
+    public synchronized InmemoryDOMDataTreeShardWriteTransaction createTransaction() {
+        Entry<State, DataTreeSnapshot> entry;
+        InmemoryDOMDataTreeShardWriteTransaction ret;
+        String transactionId = nextIdentifier();
+
+        do {
+            entry = getSnapshot(transactionId);
+            ret = parentShard.createTransaction(transactionId, this, prefixes, entry.getValue());
+        } while (!recordTransaction(entry.getKey(), ret));
+
+        return ret;
+    }
+
+    synchronized void transactionReady(final InmemoryDOMDataTreeShardWriteTransaction tx,
+                                       final DataTreeModification modification) {
+        final State localState = state;
+        LOG.debug("Transaction was readied {}, current state {}", tx.getIdentifier(), localState);
+
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated) localState;
+            final InmemoryDOMDataTreeShardWriteTransaction transaction = allocated.getTransaction();
+            Preconditions.checkState(tx.equals(transaction),
+                    "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
+            allocated.setSnapshot(modification);
         } else {
-            currentTx = parentShard.createTransaction(prefixes);
+            LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
         }
-        return currentTx;
+    }
+
+    /**
+     * Notify the base logic that a previously-submitted transaction has been committed successfully.
+     *
+     * @param transaction Transaction which completed successfully.
+     */
+    synchronized void onTransactionCommited(final InmemoryDOMDataTreeShardWriteTransaction transaction) {
+        // If the committed transaction was the one we allocated last,
+        // we clear it and the ready snapshot, so the next transaction
+        // allocated refers to the data tree directly.
+        final State localState = state;
+        LOG.debug("Transaction {} commit done, current state {}", transaction.getIdentifier(), localState);
+
+        if (!(localState instanceof Allocated)) {
+            // This can legally happen if the chain is shut down before the transaction was committed
+            // by the backend.
+            LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
+            return;
+        }
+
+        final Allocated allocated = (Allocated) localState;
+        final InmemoryDOMDataTreeShardWriteTransaction tx = allocated.getTransaction();
+        if (!tx.equals(transaction)) {
+            LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
+            return;
+        }
+
+        if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
+            LOG.debug("Producer {} has already transitioned from {} to {}, not making it idle", this,
+                    localState, state);
+        }
+    }
+
+    synchronized void transactionAborted(final InmemoryDOMDataTreeShardWriteTransaction tx) {
+        final State localState = state;
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated)localState;
+            if (allocated.getTransaction().equals(tx)) {
+                final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
+                if (!success) {
+                    LOG.warn("Transaction {} aborted, but producer {} state already transitioned from {} to {}",
+                            tx, this, localState, state);
+                }
+            }
+        }
+    }
+
+
+    private Entry<State, DataTreeSnapshot> getSnapshot(String transactionId) {
+        final State localState = state;
+        return new SimpleEntry<>(localState, localState.getSnapshot(transactionId));
+    }
+
+    private boolean recordTransaction(final State expected,
+                                      final InmemoryDOMDataTreeShardWriteTransaction transaction) {
+        final State state = new Allocated(transaction);
+        return STATE_UPDATER.compareAndSet(this, expected, state);
+    }
+
+    private String nextIdentifier() {
+        return "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
+
+    }
+
+    DataTreeSnapshot takeSnapshot() {
+        return parentShard.takeSnapshot();
     }
 
     @Override
index 2ef7ebb28bbc5da9e1eb220362f5ff639b0e1c5a..d3265fdab3716d05d95bad6f62e43c51ca547459 100644 (file)
@@ -16,10 +16,12 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -28,7 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
 
     private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
 
@@ -75,26 +77,39 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         }
     }
 
+    private static final AtomicLong COUNTER = new AtomicLong();
+
     private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
     private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
+    private final InMemoryDOMDataTreeShardProducer producer;
     private final ShardDataModification modification;
     private final ListeningExecutorService executor;
     private final DataTree rootShardDataTree;
+    private final String identifier;
 
     private DataTreeModification rootModification = null;
     private DOMDataTreeWriteCursor cursor;
     private boolean finished = false;
 
-    InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root,
+    InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
+                                             final ShardDataModification root,
                                              final DataTree rootShardDataTree,
                                              final InMemoryDOMDataTreeShardChangePublisher changePublisher,
                                              final ListeningExecutorService executor) {
+        this.producer = producer;
         this.modification = Preconditions.checkNotNull(root);
         this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
         this.changePublisher = Preconditions.checkNotNull(changePublisher);
+        this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
+        LOG.debug("Shard transaction{} created", identifier);
         this.executor = executor;
     }
 
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
     private DOMDataTreeWriteCursor getCursor() {
         if (cursor == null) {
             cursor = new ShardDataModificationCursor(modification, this);
@@ -139,6 +154,7 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         if (cursor != null) {
             cursor.close();
         }
+        producer.transactionAborted(this);
         finished = true;
     }
 
@@ -161,6 +177,7 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
         rootModification = modification.seal();
 
+        producer.transactionReady(this, rootModification);
         cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
                 rootShardDataTree, rootModification, changePublisher));
         for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
@@ -178,7 +195,7 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
 
         final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(
-                modification.getPrefix(), cohorts));
+                modification.getPrefix(), cohorts, this));
 
         return submit;
     }
@@ -206,10 +223,19 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
 
         final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(
-                modification.getPrefix(), cohorts));
+                modification.getPrefix(), cohorts, this));
         return submit;
     }
 
+    DataTreeModification getRootModification() {
+        Preconditions.checkNotNull(rootModification, "Transaction wasn't sealed yet");
+        return rootModification;
+    }
+
+    void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
+        producer.onTransactionCommited(tx);
+    }
+
     @Override
     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
         Preconditions.checkState(!finished, "Transaction is finished/closed already.");
index dbbd0abdb2c0e393b56a10116ae53f780832e5d0..a6adbb675f3f3acbfe7b786d32b5926edb6f86a6 100644 (file)
@@ -30,11 +30,14 @@ public class ShardCommitCoordinationTask implements Callable<Void> {
 
     private final DOMDataTreeIdentifier rootShardPrefix;
     private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
+    private InmemoryDOMDataTreeShardWriteTransaction transaction;
 
     public ShardCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
-                                       final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+                                       final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
+                                       final InmemoryDOMDataTreeShardWriteTransaction transaction) {
         this.rootShardPrefix = Preconditions.checkNotNull(rootShardPrefix);
         this.cohorts = Preconditions.checkNotNull(cohorts);
+        this.transaction = Preconditions.checkNotNull(transaction);
     }
 
     @Override
@@ -43,6 +46,7 @@ public class ShardCommitCoordinationTask implements Callable<Void> {
         try {
             LOG.debug("Shard {}, commit started", rootShardPrefix);
             commitBlocking();
+            transaction.transactionCommited(transaction);
 
             return null;
         } catch (final TransactionCommitFailedException e) {
index c7b0ee6adc01ae91c3222d6a25f708615f220242..6034e0b6b0f3b41c065071c3b03340ae70873513 100644 (file)
@@ -31,29 +31,34 @@ public class ShardSubmitCoordinationTask implements Callable<Void> {
     private final ShardCanCommitCoordinationTask canCommitCoordinationTask;
     private final ShardPreCommitCoordinationTask preCommitCoordinationTask;
     private final ShardCommitCoordinationTask commitCoordinationTask;
+    private final InmemoryDOMDataTreeShardWriteTransaction transaction;
 
 
     public ShardSubmitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
-                                       final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+                                       final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
+                                       final InmemoryDOMDataTreeShardWriteTransaction transaction) {
         this.rootShardPrefix = Preconditions.checkNotNull(rootShardPrefix);
+        this.transaction = transaction;
 
         canCommitCoordinationTask = new ShardCanCommitCoordinationTask(rootShardPrefix, cohorts);
         preCommitCoordinationTask = new ShardPreCommitCoordinationTask(rootShardPrefix, cohorts);
-        commitCoordinationTask = new ShardCommitCoordinationTask(rootShardPrefix, cohorts);
+        commitCoordinationTask = new ShardCommitCoordinationTask(rootShardPrefix, cohorts, transaction);
     }
 
     @Override
     public Void call() throws TransactionCommitFailedException {
 
-        LOG.debug("Shard {}, CanCommit started", rootShardPrefix);
+        LOG.debug("Shard {}, tx{} CanCommit started", transaction.getIdentifier(), rootShardPrefix);
         canCommitCoordinationTask.canCommitBlocking();
 
-        LOG.debug("Shard {}, PreCommit started", rootShardPrefix);
+        LOG.debug("Shard {}, tx{} PreCommit started", transaction.getIdentifier(), rootShardPrefix);
         preCommitCoordinationTask.preCommitBlocking();
 
-        LOG.debug("Shard {}, commit started", rootShardPrefix);
+        LOG.debug("Shard {}, tx{} commit started", transaction.getIdentifier(), rootShardPrefix);
         commitCoordinationTask.commitBlocking();
 
+        transaction.transactionCommited(transaction);
+
         return null;
     }
 }
index 729211641270355fdfb4554578e5bad1fb0576a7..0115866eb4f422033d5115ac5550c39ad9695dbd 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.mdsal.dom.store.inmemory;
 
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyCollectionOf;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -18,6 +19,7 @@ import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.resetMocks;
 import com.google.common.collect.ImmutableSet;
 import org.junit.Test;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
 
 public class InMemoryDOMDataTreeShardProducerTest {
 
@@ -26,15 +28,23 @@ public class InMemoryDOMDataTreeShardProducerTest {
         final InMemoryDOMDataTreeShard inMemoryDOMDataTreeShard = mock(InMemoryDOMDataTreeShard.class);
         final InmemoryDOMDataTreeShardWriteTransaction inmemoryDOMDataTreeShardWriteTransaction =
                 mock(InmemoryDOMDataTreeShardWriteTransaction.class);
+        final CursorAwareDataTreeSnapshot snapshot = mock(CursorAwareDataTreeSnapshot.class);
+        doReturn(snapshot).when(inMemoryDOMDataTreeShard).takeSnapshot();
+
         doReturn(inmemoryDOMDataTreeShardWriteTransaction).when(inMemoryDOMDataTreeShard)
-                .createTransaction(anyCollectionOf((DOMDataTreeIdentifier.class)));
+                .createTransaction(any(String.class), any(InMemoryDOMDataTreeShardProducer.class),
+                        anyCollectionOf((DOMDataTreeIdentifier.class)), any(CursorAwareDataTreeSnapshot.class));
 
         final InMemoryDOMDataTreeShardProducer inMemoryDOMDataTreeShardProducer =
                 new InMemoryDOMDataTreeShardProducer(inMemoryDOMDataTreeShard,
                         ImmutableSet.of(DOM_DATA_TREE_IDENTIFIER));
 
         assertNotNull(inMemoryDOMDataTreeShardProducer.createTransaction());
-        verify(inMemoryDOMDataTreeShard).createTransaction(anyCollectionOf(DOMDataTreeIdentifier.class));
+        verify(inMemoryDOMDataTreeShard).createTransaction(
+                any(String.class),
+                any(InMemoryDOMDataTreeShardProducer.class),
+                anyCollectionOf(DOMDataTreeIdentifier.class),
+                any(CursorAwareDataTreeSnapshot.class));
         resetMocks();
     }
 }
\ No newline at end of file
index add31fec0892226e980305aa0f5cc9780d8658a1..f9b4d4ee0482a80b5f58233e219d886ee1c6737a 100644 (file)
@@ -32,6 +32,8 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
 
 public class InMemoryDOMDataTreeShardTest {
 
@@ -57,8 +59,10 @@ public class InMemoryDOMDataTreeShardTest {
         final Collection<DOMDataTreeIdentifier> prefixes = ImmutableList.of(DOM_DATA_TREE_IDENTIFIER);
         assertEquals(prefixes.toString(), inMemoryDOMDataTreeShard.createProducer(prefixes).getPrefixes().toString());
 
+        final InMemoryDOMDataTreeShardProducer mockProducer = mock(InMemoryDOMDataTreeShardProducer.class);
+
         inMemoryDOMDataTreeShard.onGlobalContextUpdated(createTestContext());
-        inMemoryDOMDataTreeShard.createTransaction(prefixes);
+        inMemoryDOMDataTreeShard.createTransaction("", mockProducer, prefixes, mock(CursorAwareDataTreeSnapshot.class));
 
         final DOMDataTreeChangeListener domDataTreeChangeListener = mock(DOMDataTreeChangeListener.class);
         final ListenerRegistration listenerRegistration = mock(ListenerRegistration.class);
@@ -71,7 +75,7 @@ public class InMemoryDOMDataTreeShardTest {
         assertFalse(inMemoryDOMDataTreeShard.getChildShards().containsKey(DOM_DATA_TREE_IDENTIFIER));
     }
 
-    @Test(expected = UnsupportedOperationException.class)
+    @Test
     public void createTransactionWithException() throws Exception {
         final DOMDataTreeIdentifier domDataTreeIdentifier =
                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
@@ -79,11 +83,15 @@ public class InMemoryDOMDataTreeShardTest {
         final InMemoryDOMDataTreeShard inMemoryDOMDataTreeShard =
                 InMemoryDOMDataTreeShard.create(domDataTreeIdentifier,
                         MoreExecutors.newDirectExecutorService(), 1);
+        final CursorAwareDataTreeModification dataTreeModification = mock(CursorAwareDataTreeModification.class);
 
         final InmemoryDOMDataTreeShardWriteTransaction inmemoryDOMDataTreeShardWriteTransaction =
                 mock(InmemoryDOMDataTreeShardWriteTransaction.class);
+        doReturn(dataTreeModification).when(inmemoryDOMDataTreeShardWriteTransaction).getRootModification();
+        final InMemoryDOMDataTreeShardProducer mockProducer = mock(InMemoryDOMDataTreeShardProducer.class);
+        final Collection<DOMDataTreeIdentifier> prefixes = ImmutableList.of(DOM_DATA_TREE_IDENTIFIER);
 
-        inMemoryDOMDataTreeShard.createTransaction(inmemoryDOMDataTreeShardWriteTransaction);
+        inMemoryDOMDataTreeShard.createTransaction("", mockProducer, prefixes, mock(CursorAwareDataTreeSnapshot.class));
     }
 
     @After
index b73d1e30475717dac53c261e199a6d48636c25c1..345ddabbe1686ac722222182638a77349e34d551 100644 (file)
@@ -45,7 +45,7 @@ public class InMemoryDOMDataTreeShardThreePhaseCommitCohortTest {
     private static final InMemoryDOMDataTreeShardThreePhaseCommitCohort
             IN_MEMORY_DOM_DATA_TREE_SHARD_THREE_PHASE_COMMIT_COHORT =
                 new InMemoryDOMDataTreeShardThreePhaseCommitCohort(DATA_TREE, DATA_TREE_MODIFICATION,
-                    IN_MEMORY_DOM_DATA_TREE_SHARD_CHANGE_PUBLISHER);
+                        IN_MEMORY_DOM_DATA_TREE_SHARD_CHANGE_PUBLISHER);
 
     @Before
     public void setUp() throws Exception {
index 7d657e62aa31863bc0cd82255867112c7eacd76e..08eb6adfb2125f73edfb1aa50b1a2b972d003990 100644 (file)
@@ -58,6 +58,7 @@ public class InmemoryDOMDataTreeShardWriteTransactionTest {
             new ChildShardContext(DOM_DATA_TREE_IDENTIFIER, READABLE_WRITEABLE_DOM_DATA_TREE_SHARD);
     private static final Map<DOMDataTreeIdentifier, ChildShardContext> CHILD_SHARDS =
             ImmutableMap.of(DOM_DATA_TREE_IDENTIFIER, CHILD_SHARD_CONTEXT);
+    private InMemoryDOMDataTreeShardProducer mockProducer;
 
     @Before
     public void setUp() throws Exception {
@@ -81,9 +82,13 @@ public class InmemoryDOMDataTreeShardWriteTransactionTest {
         final InMemoryDOMDataTreeShardChangePublisher inMemoryDOMDataTreeShardChangePublisher =
                 new InMemoryDOMDataTreeShardChangePublisher(MoreExecutors.newDirectExecutorService(), 1, DATA_TREE,
                         YANG_INSTANCE_IDENTIFIER, CHILD_SHARDS);
+        mockProducer = mock(InMemoryDOMDataTreeShardProducer.class);
+        doNothing().when(mockProducer).transactionReady(any(), any());
+        doNothing().when(mockProducer).onTransactionCommited(any());
+        doNothing().when(mockProducer).transactionAborted(any());
 
         inmemoryDOMDataTreeShardWriteTransaction =
-                new InmemoryDOMDataTreeShardWriteTransaction(shardDataModification, DATA_TREE,
+                new InmemoryDOMDataTreeShardWriteTransaction(mockProducer, shardDataModification, DATA_TREE,
                         inMemoryDOMDataTreeShardChangePublisher,
                         MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
     }
index 24a2047b53eb25dd8071f5691a9c11233d2c96c8..13f76b425ede750abf544f168f7f65f7b030fb5a 100644 (file)
@@ -7,8 +7,11 @@
  */
 package org.opendaylight.mdsal.dom.store.inmemory;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.COHORTS;
 import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.DOM_DATA_TREE_IDENTIFIER;
@@ -17,11 +20,20 @@ import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.LISTENABLE_FUT
 import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.resetMocks;
 
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 
 public class ShardCommitCoordinationTaskTest {
 
+    final InmemoryDOMDataTreeShardWriteTransaction mockTx = mock(InmemoryDOMDataTreeShardWriteTransaction.class);
+
+    @Before
+    public void setUp() throws Exception {
+        doReturn("MockedTx").when(mockTx).toString();
+        doNothing().when(mockTx).transactionCommited(any());
+    }
+
     @Test
     public void basicTest() throws Exception {
         doReturn(Void.TYPE).when(LISTENABLE_FUTURE).get();
@@ -30,7 +42,7 @@ public class ShardCommitCoordinationTaskTest {
         COHORTS.add(DOM_STORE_THREE_PHASE_COMMIT_COHORT);
 
         ShardCommitCoordinationTask shardCommitCoordinationTask =
-                new ShardCommitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, COHORTS);
+                new ShardCommitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, COHORTS, mockTx);
 
         shardCommitCoordinationTask.call();
         verify(DOM_STORE_THREE_PHASE_COMMIT_COHORT).commit();
@@ -43,7 +55,7 @@ public class ShardCommitCoordinationTaskTest {
 
         COHORTS.add(DOM_STORE_THREE_PHASE_COMMIT_COHORT);
         ShardCommitCoordinationTask shardCommitCoordinationTask =
-                new ShardCommitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, COHORTS);
+                new ShardCommitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, COHORTS, mockTx);
         shardCommitCoordinationTask.call();
     }
 
index 1352ed1d9c1d6252081fdd8a31bdaf265c72bd7d..ccbf0ba6b7b3f6ee00f535ef608674b50278ac6f 100644 (file)
@@ -7,21 +7,33 @@
  */
 package org.opendaylight.mdsal.dom.store.inmemory;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.mdsal.dom.store.inmemory.TestUtils.DOM_DATA_TREE_IDENTIFIER;
 
 import java.lang.reflect.Field;
 import java.util.Collections;
+import org.junit.Before;
 import org.junit.Test;
 
 public class ShardSubmitCoordinationTaskTest {
 
+    private final InmemoryDOMDataTreeShardWriteTransaction tx = mock(InmemoryDOMDataTreeShardWriteTransaction.class);
+
+    @Before
+    public void setUp() throws Exception {
+        doReturn("TestTx").when(tx).getIdentifier();
+        doReturn("TestTx").when(tx).toString();
+        doNothing().when(tx).transactionCommited(any());
+    }
+
     @Test
     public void basicTest() throws Exception {
         final ShardSubmitCoordinationTask shardSubmitCoordinationTask =
-                new ShardSubmitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, Collections.EMPTY_SET);
+                new ShardSubmitCoordinationTask(DOM_DATA_TREE_IDENTIFIER, Collections.EMPTY_SET, tx);
 
         final ShardCanCommitCoordinationTask canCommitCoordinationTask = mock(ShardCanCommitCoordinationTask.class);
         doNothing().when(canCommitCoordinationTask).canCommitBlocking();