Merge "Bug 1073: Added Transaction Chain support to InMemoryDataTreeModification."
authorEd Warnicke <eaw@cisco.com>
Fri, 6 Jun 2014 12:21:03 +0000 (12:21 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 6 Jun 2014 12:21:03 +0000 (12:21 +0000)
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/data/InMemoryDataTreeModification.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java

index 2495146..25e6d04 100644 (file)
@@ -13,6 +13,8 @@ import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
@@ -41,20 +43,22 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
 /**
  * In-memory DOM Data Store
- *
- * Implementation of {@link DOMStore} which uses {@link DataTree}
- * and other classes such as {@link SnapshotBackedWriteTransaction}.
+ * 
+ * Implementation of {@link DOMStore} which uses {@link DataTree} and other
+ * classes such as {@link SnapshotBackedWriteTransaction}.
  * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
  * to implement {@link DOMStore} contract.
- *
+ * 
  */
-public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener, TransactionReadyPrototype {
+public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
+        TransactionReadyPrototype {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
     private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
     private final ListenerTree listenerTree = ListenerTree.create();
@@ -104,7 +108,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         /*
          * Make sure commit is not occurring right now. Listener has to be
          * registered and its state capture enqueued at a consistent point.
-         *
+         * 
          * FIXME: improve this to read-write lock, such that multiple listener
          * registrations can occur simultaneously
          */
@@ -148,14 +152,22 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
     private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
 
-        private SnapshotBackedWriteTransaction previousOutstandingTx;
+        @GuardedBy("this")
+        private SnapshotBackedWriteTransaction latestOutstandingTx;
+
+        private boolean chainFailed = false;
+
+        private void checkFailed() {
+            Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
+        }
 
         @Override
         public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
             final DataTreeSnapshot snapshot;
-            if(previousOutstandingTx != null) {
-                checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
-                snapshot = previousOutstandingTx.getMutatedView();
+            checkFailed();
+            if (latestOutstandingTx != null) {
+                checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
+                snapshot = latestOutstandingTx.getMutatedView();
             } else {
                 snapshot = dataTree.takeSnapshot();
             }
@@ -165,42 +177,112 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         @Override
         public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
             final DataTreeSnapshot snapshot;
-            if(previousOutstandingTx != null) {
-                checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
-                snapshot = previousOutstandingTx.getMutatedView();
+            checkFailed();
+            if (latestOutstandingTx != null) {
+                checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
+                snapshot = latestOutstandingTx.getMutatedView();
             } else {
-                snapshot = dataTree.takeSnapshot().newModification();
+                snapshot = dataTree.takeSnapshot();
             }
-            SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot,this);
+            final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
+                    snapshot, this);
+            latestOutstandingTx = ret;
             return ret;
         }
 
         @Override
         public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
             final DataTreeSnapshot snapshot;
-            if(previousOutstandingTx != null) {
-                checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
-                snapshot = previousOutstandingTx.getMutatedView();
+            checkFailed();
+            if (latestOutstandingTx != null) {
+                checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
+                snapshot = latestOutstandingTx.getMutatedView();
             } else {
-                snapshot = dataTree.takeSnapshot().newModification();
+                snapshot = dataTree.takeSnapshot();
             }
-            SnapshotBackedWriteTransaction ret =new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,this);
+            final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,
+                    this);
+            latestOutstandingTx = ret;
             return ret;
         }
 
         @Override
         public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
             DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
-            // FIXME: We probably want to add Transaction Chain cohort
-            return storeCohort;
+            return new ChainedTransactionCommitImpl(tx, storeCohort, this);
         }
 
         @Override
         public void close() {
-            // TODO Auto-generated method stub
 
         }
 
+        protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
+                final Throwable t) {
+            chainFailed = true;
+
+        }
+
+        public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
+            // If commited transaction is latestOutstandingTx we clear
+            // latestOutstandingTx
+            // field in order to base new transactions on Datastore Data Tree
+            // directly.
+            if (transaction.equals(latestOutstandingTx)) {
+                latestOutstandingTx = null;
+            }
+        }
+
+    }
+
+    private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
+
+        private final SnapshotBackedWriteTransaction transaction;
+        private final DOMStoreThreePhaseCommitCohort delegate;
+
+        private final DOMStoreTransactionChainImpl txChain;
+
+        protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
+                final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
+            super();
+            this.transaction = transaction;
+            this.delegate = delegate;
+            this.txChain = txChain;
+        }
+
+        @Override
+        public ListenableFuture<Boolean> canCommit() {
+            return delegate.canCommit();
+        }
+
+        @Override
+        public ListenableFuture<Void> preCommit() {
+            return delegate.preCommit();
+        }
+
+        @Override
+        public ListenableFuture<Void> abort() {
+            return delegate.abort();
+        }
+
+        @Override
+        public ListenableFuture<Void> commit() {
+            ListenableFuture<Void> commitFuture = delegate.commit();
+            Futures.addCallback(commitFuture, new FutureCallback<Void>() {
+                @Override
+                public void onFailure(final Throwable t) {
+                    txChain.onTransactionFailed(transaction, t);
+                }
+
+                @Override
+                public void onSuccess(final Void result) {
+                    txChain.onTransactionCommited(transaction);
+                }
+
+            });
+            return commitFuture;
+        }
+
     }
 
     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
@@ -226,7 +308,8 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
                         LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
                         return true;
                     } catch (DataPreconditionFailedException e) {
-                        LOG.warn("Store Tx: {} Data Precondition failed for {}.",transaction.getIdentifier(),e.getPath(),e);
+                        LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
+                                e.getPath(), e);
                         return false;
                     }
                 }
index f7e95b8..39ff4f0 100644 (file)
@@ -12,6 +12,7 @@ import java.util.Map.Entry;
 import javax.annotation.concurrent.GuardedBy;
 
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeModification;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.spi.TreeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
@@ -151,7 +152,27 @@ final class InMemoryDataTreeModification implements DataTreeModification {
     public synchronized DataTreeModification newModification() {
         Preconditions.checkState(sealed, "Attempted to chain on an unsealed modification");
 
-        // FIXME: transaction chaining
-        throw new UnsupportedOperationException("Implement this as part of transaction chaining");
+        if(rootNode.getType() == ModificationType.UNMODIFIED) {
+            return snapshot.newModification();
+        }
+
+        /*
+         *  FIXME: Add advanced transaction chaining for modification of not rebased
+         *  modification.
+         *
+         *  Current computation of tempRoot may yeld incorrect subtree versions
+         *  if there are multiple concurrent transactions, which may break
+         *  versioning preconditions for modification of previously occured write,
+         *  directly nested under parent node, since node version is derived from
+         *  subtree version.
+         *
+         *  For deeper nodes subtree version is derived from their respective metadata
+         *  nodes, so this incorrect root subtree version is not affecting us.
+         */
+        TreeNode originalSnapshotRoot = snapshot.getRootNode();
+        Optional<TreeNode> tempRoot = strategyTree.apply(rootNode, Optional.of(originalSnapshotRoot), originalSnapshotRoot.getSubtreeVersion().next());
+
+        InMemoryDataTreeSnapshot tempTree = new InMemoryDataTreeSnapshot(snapshot.getSchemaContext(), tempRoot.get(), strategyTree);
+        return tempTree.newModification();
     }
 }
index c0f0a35..94ac8d6 100644 (file)
@@ -12,6 +12,7 @@ import org.junit.Test;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -25,7 +26,6 @@ public class InMemoryDataStoreTest {
     private SchemaContext schemaContext;
     private InMemoryDOMDataStore domStore;
 
-
     @Before
     public void setupStore() {
         domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
@@ -34,40 +34,36 @@ public class InMemoryDataStoreTest {
 
     }
 
-
     @Test
     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
 
         assertNotNull(domStore);
 
-
         DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction();
         assertNotNull(readTx);
 
         DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
         assertNotNull(writeTx);
         /**
-         *
+         * 
          * Writes /test in writeTx
-         *
+         * 
          */
         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         /**
-         *
-         * Reads /test from writeTx
-         * Read should return container.
-         *
+         * 
+         * Reads /test from writeTx Read should return container.
+         * 
          */
         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
         assertTrue(writeTxContainer.get().isPresent());
 
         /**
-        *
-        * Reads /test from readTx
-        * Read should return Absent.
-        *
-        */
+         * 
+         * Reads /test from readTx Read should return Absent.
+         * 
+         */
         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx.read(TestModel.TEST_PATH);
         assertFalse(readTxContainer.get().isPresent());
     }
@@ -78,17 +74,16 @@ public class InMemoryDataStoreTest {
         DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
         assertNotNull(writeTx);
         /**
-         *
+         * 
          * Writes /test in writeTx
-         *
+         * 
          */
         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         /**
-         *
-         * Reads /test from writeTx
-         * Read should return container.
-         *
+         * 
+         * Reads /test from writeTx Read should return container.
+         * 
          */
         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
         assertTrue(writeTxContainer.get().isPresent());
@@ -97,7 +92,8 @@ public class InMemoryDataStoreTest {
 
         assertThreePhaseCommit(cohort);
 
-        Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH).get();
+        Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
+                .get();
         assertTrue(afterCommitRead.isPresent());
     }
 
@@ -115,10 +111,91 @@ public class InMemoryDataStoreTest {
         cohort.preCommit().get();
         cohort.abort().get();
 
-        Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH).get();
+        Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
+                .get();
         assertFalse(afterCommitRead.isPresent());
     }
 
+    @Test
+    public void testTransactionChain() throws InterruptedException, ExecutionException {
+        DOMStoreTransactionChain txChain = domStore.createTransactionChain();
+        assertNotNull(txChain);
+
+        /**
+         * We alocate new read-write transaction and write /test
+         * 
+         * 
+         */
+        DOMStoreReadWriteTransaction firstTx = txChain.newReadWriteTransaction();
+        assertTestContainerWrite(firstTx);
+
+        /**
+         * First transaction is marked as ready, we are able to allocate chained
+         * transactions
+         */
+        DOMStoreThreePhaseCommitCohort firstWriteTxCohort = firstTx.ready();
+
+        /**
+         * We alocate chained transaction - read transaction, note first one is
+         * still not commited to datastore.
+         */
+        DOMStoreReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+
+        /**
+         * 
+         * We test if we are able to read data from tx, read should not fail
+         * since we are using chained transaction.
+         * 
+         * 
+         */
+        assertTestContainerExists(secondReadTx);
+
+        /**
+         * 
+         * We alocate next transaction, which is still based on first one, but
+         * is read-write.
+         * 
+         */
+        DOMStoreReadWriteTransaction thirdDeleteTx = txChain.newReadWriteTransaction();
+
+        /**
+         * We test existence of /test in third transaction container should
+         * still be visible from first one (which is still uncommmited).
+         * 
+         * 
+         */
+        assertTestContainerExists(thirdDeleteTx);
+
+        /**
+         * We delete node in third transaction
+         */
+        thirdDeleteTx.delete(TestModel.TEST_PATH);
+
+        /**
+         * third transaction is sealed.
+         */
+        DOMStoreThreePhaseCommitCohort thirdDeleteTxCohort = thirdDeleteTx.ready();
+
+        /**
+         * We commit first transaction
+         * 
+         */
+        assertThreePhaseCommit(firstWriteTxCohort);
+
+        // Alocates store transacion
+        DOMStoreReadTransaction storeReadTx = domStore.newReadOnlyTransaction();
+        /**
+         * We verify transaction is commited to store, container should exists
+         * in datastore.
+         */
+        assertTestContainerExists(storeReadTx);
+        /**
+         * We commit third transaction
+         * 
+         */
+        assertThreePhaseCommit(thirdDeleteTxCohort);
+    }
+
     @Test
     @Ignore
     public void testTransactionConflict() throws InterruptedException, ExecutionException {
@@ -138,32 +215,36 @@ public class InMemoryDataStoreTest {
         assertFalse(txTwo.ready().canCommit().get());
     }
 
-
-
-    private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort) throws InterruptedException, ExecutionException {
+    private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort)
+            throws InterruptedException, ExecutionException {
         assertTrue(cohort.canCommit().get().booleanValue());
         cohort.preCommit().get();
         cohort.commit().get();
     }
 
-
-    private static Optional<NormalizedNode<?, ?>> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx) throws InterruptedException, ExecutionException {
+    private static Optional<NormalizedNode<?, ?>> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx)
+            throws InterruptedException, ExecutionException {
         /**
-        *
-        * Writes /test in writeTx
-        *
-        */
-       writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+         * 
+         * Writes /test in writeTx
+         * 
+         */
+        writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        return assertTestContainerExists(writeTx);
+    }
+
+    /**
+     * 
+     * Reads /test from readTx Read should return container.
+     * 
+     */
+    private static Optional<NormalizedNode<?, ?>> assertTestContainerExists(DOMStoreReadTransaction readTx)
+            throws InterruptedException, ExecutionException {
 
-       /**
-        *
-        * Reads /test from writeTx
-        * Read should return container.
-        *
-        */
-       ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
-       assertTrue(writeTxContainer.get().isPresent());
-       return writeTxContainer.get();
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = readTx.read(TestModel.TEST_PATH);
+        assertTrue(writeTxContainer.get().isPresent());
+        return writeTxContainer.get();
     }
 
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.