From: Tony Tkacik Date: Mon, 26 May 2014 21:29:53 +0000 (+0200) Subject: Bug 1073: Added Transaction Chain support to InMemoryDataTreeModification. X-Git-Tag: release/helium~689^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f7e2efa41f9c6be3282204c72817490b1e6b2cb2 Bug 1073: Added Transaction Chain support to InMemoryDataTreeModification. Added support for chaining transactions to underlying implementation. Added test case testTransactionChain, which is documented and illustrates basic behaviour of transaction chain - local virtual memory, which allows to allocate new transactions which provides view as-if previous transaction already happened. Change-Id: I23f5622f9a6498356b3c54d53e0111f3fba33bf5 Signed-off-by: Tony Tkacik --- diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index 2495146aa6..25e6d04721 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -13,6 +13,8 @@ import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; + import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype; @@ -41,20 +43,22 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; /** * In-memory DOM Data Store - * - * Implementation of {@link DOMStore} which uses {@link DataTree} - * and other classes such as {@link SnapshotBackedWriteTransaction}. + * + * Implementation of {@link DOMStore} which uses {@link DataTree} and other + * classes such as {@link SnapshotBackedWriteTransaction}. * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask} * to implement {@link DOMStore} contract. - * + * */ -public class InMemoryDOMDataStore implements DOMStore, Identifiable, SchemaContextListener, TransactionReadyPrototype { +public class InMemoryDOMDataStore implements DOMStore, Identifiable, SchemaContextListener, + TransactionReadyPrototype { private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class); private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(); private final ListenerTree listenerTree = ListenerTree.create(); @@ -104,7 +108,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch /* * Make sure commit is not occurring right now. Listener has to be * registered and its state capture enqueued at a consistent point. - * + * * FIXME: improve this to read-write lock, such that multiple listener * registrations can occur simultaneously */ @@ -148,14 +152,22 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype { - private SnapshotBackedWriteTransaction previousOutstandingTx; + @GuardedBy("this") + private SnapshotBackedWriteTransaction latestOutstandingTx; + + private boolean chainFailed = false; + + private void checkFailed() { + Preconditions.checkState(!chainFailed, "Transaction chain is failed."); + } @Override public synchronized DOMStoreReadTransaction newReadOnlyTransaction() { final DataTreeSnapshot snapshot; - if(previousOutstandingTx != null) { - checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready."); - snapshot = previousOutstandingTx.getMutatedView(); + checkFailed(); + if (latestOutstandingTx != null) { + checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready."); + snapshot = latestOutstandingTx.getMutatedView(); } else { snapshot = dataTree.takeSnapshot(); } @@ -165,42 +177,112 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() { final DataTreeSnapshot snapshot; - if(previousOutstandingTx != null) { - checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready."); - snapshot = previousOutstandingTx.getMutatedView(); + checkFailed(); + if (latestOutstandingTx != null) { + checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready."); + snapshot = latestOutstandingTx.getMutatedView(); } else { - snapshot = dataTree.takeSnapshot().newModification(); + snapshot = dataTree.takeSnapshot(); } - SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot,this); + final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(), + snapshot, this); + latestOutstandingTx = ret; return ret; } @Override public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() { final DataTreeSnapshot snapshot; - if(previousOutstandingTx != null) { - checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready."); - snapshot = previousOutstandingTx.getMutatedView(); + checkFailed(); + if (latestOutstandingTx != null) { + checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready."); + snapshot = latestOutstandingTx.getMutatedView(); } else { - snapshot = dataTree.takeSnapshot().newModification(); + snapshot = dataTree.takeSnapshot(); } - SnapshotBackedWriteTransaction ret =new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,this); + final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot, + this); + latestOutstandingTx = ret; return ret; } @Override public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) { DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx); - // FIXME: We probably want to add Transaction Chain cohort - return storeCohort; + return new ChainedTransactionCommitImpl(tx, storeCohort, this); } @Override public void close() { - // TODO Auto-generated method stub } + protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, + final Throwable t) { + chainFailed = true; + + } + + public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) { + // If commited transaction is latestOutstandingTx we clear + // latestOutstandingTx + // field in order to base new transactions on Datastore Data Tree + // directly. + if (transaction.equals(latestOutstandingTx)) { + latestOutstandingTx = null; + } + } + + } + + private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort { + + private final SnapshotBackedWriteTransaction transaction; + private final DOMStoreThreePhaseCommitCohort delegate; + + private final DOMStoreTransactionChainImpl txChain; + + protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction, + final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) { + super(); + this.transaction = transaction; + this.delegate = delegate; + this.txChain = txChain; + } + + @Override + public ListenableFuture canCommit() { + return delegate.canCommit(); + } + + @Override + public ListenableFuture preCommit() { + return delegate.preCommit(); + } + + @Override + public ListenableFuture abort() { + return delegate.abort(); + } + + @Override + public ListenableFuture commit() { + ListenableFuture commitFuture = delegate.commit(); + Futures.addCallback(commitFuture, new FutureCallback() { + @Override + public void onFailure(final Throwable t) { + txChain.onTransactionFailed(transaction, t); + } + + @Override + public void onSuccess(final Void result) { + txChain.onTransactionCommited(transaction); + } + + }); + return commitFuture; + } + } private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort { @@ -226,7 +308,8 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier()); return true; } catch (DataPreconditionFailedException e) { - LOG.warn("Store Tx: {} Data Precondition failed for {}.",transaction.getIdentifier(),e.getPath(),e); + LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(), + e.getPath(), e); return false; } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/data/InMemoryDataTreeModification.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/data/InMemoryDataTreeModification.java index f7e95b84bd..39ff4f0aa0 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/data/InMemoryDataTreeModification.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/data/InMemoryDataTreeModification.java @@ -12,6 +12,7 @@ import java.util.Map.Entry; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeModification; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType; import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils; import org.opendaylight.controller.md.sal.dom.store.impl.tree.spi.TreeNode; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; @@ -151,7 +152,27 @@ final class InMemoryDataTreeModification implements DataTreeModification { public synchronized DataTreeModification newModification() { Preconditions.checkState(sealed, "Attempted to chain on an unsealed modification"); - // FIXME: transaction chaining - throw new UnsupportedOperationException("Implement this as part of transaction chaining"); + if(rootNode.getType() == ModificationType.UNMODIFIED) { + return snapshot.newModification(); + } + + /* + * FIXME: Add advanced transaction chaining for modification of not rebased + * modification. + * + * Current computation of tempRoot may yeld incorrect subtree versions + * if there are multiple concurrent transactions, which may break + * versioning preconditions for modification of previously occured write, + * directly nested under parent node, since node version is derived from + * subtree version. + * + * For deeper nodes subtree version is derived from their respective metadata + * nodes, so this incorrect root subtree version is not affecting us. + */ + TreeNode originalSnapshotRoot = snapshot.getRootNode(); + Optional tempRoot = strategyTree.apply(rootNode, Optional.of(originalSnapshotRoot), originalSnapshotRoot.getSubtreeVersion().next()); + + InMemoryDataTreeSnapshot tempTree = new InMemoryDataTreeSnapshot(snapshot.getSchemaContext(), tempRoot.get(), strategyTree); + return tempTree.newModification(); } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java index c0f0a35565..94ac8d652e 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java @@ -12,6 +12,7 @@ import org.junit.Test; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -25,7 +26,6 @@ public class InMemoryDataStoreTest { private SchemaContext schemaContext; private InMemoryDOMDataStore domStore; - @Before public void setupStore() { domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor()); @@ -34,40 +34,36 @@ public class InMemoryDataStoreTest { } - @Test public void testTransactionIsolation() throws InterruptedException, ExecutionException { assertNotNull(domStore); - DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction(); assertNotNull(readTx); DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction(); assertNotNull(writeTx); /** - * + * * Writes /test in writeTx - * + * */ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); /** - * - * Reads /test from writeTx - * Read should return container. - * + * + * Reads /test from writeTx Read should return container. + * */ ListenableFuture>> writeTxContainer = writeTx.read(TestModel.TEST_PATH); assertTrue(writeTxContainer.get().isPresent()); /** - * - * Reads /test from readTx - * Read should return Absent. - * - */ + * + * Reads /test from readTx Read should return Absent. + * + */ ListenableFuture>> readTxContainer = readTx.read(TestModel.TEST_PATH); assertFalse(readTxContainer.get().isPresent()); } @@ -78,17 +74,16 @@ public class InMemoryDataStoreTest { DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction(); assertNotNull(writeTx); /** - * + * * Writes /test in writeTx - * + * */ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); /** - * - * Reads /test from writeTx - * Read should return container. - * + * + * Reads /test from writeTx Read should return container. + * */ ListenableFuture>> writeTxContainer = writeTx.read(TestModel.TEST_PATH); assertTrue(writeTxContainer.get().isPresent()); @@ -97,7 +92,8 @@ public class InMemoryDataStoreTest { assertThreePhaseCommit(cohort); - Optional> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH).get(); + Optional> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH) + .get(); assertTrue(afterCommitRead.isPresent()); } @@ -115,10 +111,91 @@ public class InMemoryDataStoreTest { cohort.preCommit().get(); cohort.abort().get(); - Optional> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH).get(); + Optional> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH) + .get(); assertFalse(afterCommitRead.isPresent()); } + @Test + public void testTransactionChain() throws InterruptedException, ExecutionException { + DOMStoreTransactionChain txChain = domStore.createTransactionChain(); + assertNotNull(txChain); + + /** + * We alocate new read-write transaction and write /test + * + * + */ + DOMStoreReadWriteTransaction firstTx = txChain.newReadWriteTransaction(); + assertTestContainerWrite(firstTx); + + /** + * First transaction is marked as ready, we are able to allocate chained + * transactions + */ + DOMStoreThreePhaseCommitCohort firstWriteTxCohort = firstTx.ready(); + + /** + * We alocate chained transaction - read transaction, note first one is + * still not commited to datastore. + */ + DOMStoreReadTransaction secondReadTx = txChain.newReadOnlyTransaction(); + + /** + * + * We test if we are able to read data from tx, read should not fail + * since we are using chained transaction. + * + * + */ + assertTestContainerExists(secondReadTx); + + /** + * + * We alocate next transaction, which is still based on first one, but + * is read-write. + * + */ + DOMStoreReadWriteTransaction thirdDeleteTx = txChain.newReadWriteTransaction(); + + /** + * We test existence of /test in third transaction container should + * still be visible from first one (which is still uncommmited). + * + * + */ + assertTestContainerExists(thirdDeleteTx); + + /** + * We delete node in third transaction + */ + thirdDeleteTx.delete(TestModel.TEST_PATH); + + /** + * third transaction is sealed. + */ + DOMStoreThreePhaseCommitCohort thirdDeleteTxCohort = thirdDeleteTx.ready(); + + /** + * We commit first transaction + * + */ + assertThreePhaseCommit(firstWriteTxCohort); + + // Alocates store transacion + DOMStoreReadTransaction storeReadTx = domStore.newReadOnlyTransaction(); + /** + * We verify transaction is commited to store, container should exists + * in datastore. + */ + assertTestContainerExists(storeReadTx); + /** + * We commit third transaction + * + */ + assertThreePhaseCommit(thirdDeleteTxCohort); + } + @Test @Ignore public void testTransactionConflict() throws InterruptedException, ExecutionException { @@ -138,32 +215,36 @@ public class InMemoryDataStoreTest { assertFalse(txTwo.ready().canCommit().get()); } - - - private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort) throws InterruptedException, ExecutionException { + private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort) + throws InterruptedException, ExecutionException { assertTrue(cohort.canCommit().get().booleanValue()); cohort.preCommit().get(); cohort.commit().get(); } - - private static Optional> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx) throws InterruptedException, ExecutionException { + private static Optional> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx) + throws InterruptedException, ExecutionException { /** - * - * Writes /test in writeTx - * - */ - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + * + * Writes /test in writeTx + * + */ + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + return assertTestContainerExists(writeTx); + } + + /** + * + * Reads /test from readTx Read should return container. + * + */ + private static Optional> assertTestContainerExists(DOMStoreReadTransaction readTx) + throws InterruptedException, ExecutionException { - /** - * - * Reads /test from writeTx - * Read should return container. - * - */ - ListenableFuture>> writeTxContainer = writeTx.read(TestModel.TEST_PATH); - assertTrue(writeTxContainer.get().isPresent()); - return writeTxContainer.get(); + ListenableFuture>> writeTxContainer = readTx.read(TestModel.TEST_PATH); + assertTrue(writeTxContainer.get().isPresent()); + return writeTxContainer.get(); } }