import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
+
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
/**
* In-memory DOM Data Store
- *
- * Implementation of {@link DOMStore} which uses {@link DataTree}
- * and other classes such as {@link SnapshotBackedWriteTransaction}.
+ *
+ * Implementation of {@link DOMStore} which uses {@link DataTree} and other
+ * classes such as {@link SnapshotBackedWriteTransaction}.
* {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
* to implement {@link DOMStore} contract.
- *
+ *
*/
-public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener, TransactionReadyPrototype {
+public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
+ TransactionReadyPrototype {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
private final ListenerTree listenerTree = ListenerTree.create();
/*
* Make sure commit is not occurring right now. Listener has to be
* registered and its state capture enqueued at a consistent point.
- *
+ *
* FIXME: improve this to read-write lock, such that multiple listener
* registrations can occur simultaneously
*/
private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
- private SnapshotBackedWriteTransaction previousOutstandingTx;
+ @GuardedBy("this")
+ private SnapshotBackedWriteTransaction latestOutstandingTx;
+
+ private boolean chainFailed = false;
+
+ private void checkFailed() {
+ Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
+ }
@Override
public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
final DataTreeSnapshot snapshot;
- if(previousOutstandingTx != null) {
- checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = previousOutstandingTx.getMutatedView();
+ checkFailed();
+ if (latestOutstandingTx != null) {
+ checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
+ snapshot = latestOutstandingTx.getMutatedView();
} else {
snapshot = dataTree.takeSnapshot();
}
@Override
public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
final DataTreeSnapshot snapshot;
- if(previousOutstandingTx != null) {
- checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = previousOutstandingTx.getMutatedView();
+ checkFailed();
+ if (latestOutstandingTx != null) {
+ checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
+ snapshot = latestOutstandingTx.getMutatedView();
} else {
- snapshot = dataTree.takeSnapshot().newModification();
+ snapshot = dataTree.takeSnapshot();
}
- SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot,this);
+ final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
+ snapshot, this);
+ latestOutstandingTx = ret;
return ret;
}
@Override
public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
final DataTreeSnapshot snapshot;
- if(previousOutstandingTx != null) {
- checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = previousOutstandingTx.getMutatedView();
+ checkFailed();
+ if (latestOutstandingTx != null) {
+ checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
+ snapshot = latestOutstandingTx.getMutatedView();
} else {
- snapshot = dataTree.takeSnapshot().newModification();
+ snapshot = dataTree.takeSnapshot();
}
- SnapshotBackedWriteTransaction ret =new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,this);
+ final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,
+ this);
+ latestOutstandingTx = ret;
return ret;
}
@Override
public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
- // FIXME: We probably want to add Transaction Chain cohort
- return storeCohort;
+ return new ChainedTransactionCommitImpl(tx, storeCohort, this);
}
@Override
public void close() {
- // TODO Auto-generated method stub
}
+ protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
+ final Throwable t) {
+ chainFailed = true;
+
+ }
+
+ public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
+ // If commited transaction is latestOutstandingTx we clear
+ // latestOutstandingTx
+ // field in order to base new transactions on Datastore Data Tree
+ // directly.
+ if (transaction.equals(latestOutstandingTx)) {
+ latestOutstandingTx = null;
+ }
+ }
+
+ }
+
+ private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
+
+ private final SnapshotBackedWriteTransaction transaction;
+ private final DOMStoreThreePhaseCommitCohort delegate;
+
+ private final DOMStoreTransactionChainImpl txChain;
+
+ protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
+ final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
+ super();
+ this.transaction = transaction;
+ this.delegate = delegate;
+ this.txChain = txChain;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return delegate.canCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return delegate.preCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegate.abort();
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ ListenableFuture<Void> commitFuture = delegate.commit();
+ Futures.addCallback(commitFuture, new FutureCallback<Void>() {
+ @Override
+ public void onFailure(final Throwable t) {
+ txChain.onTransactionFailed(transaction, t);
+ }
+
+ @Override
+ public void onSuccess(final Void result) {
+ txChain.onTransactionCommited(transaction);
+ }
+
+ });
+ return commitFuture;
+ }
+
}
private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
return true;
} catch (DataPreconditionFailedException e) {
- LOG.warn("Store Tx: {} Data Precondition failed for {}.",transaction.getIdentifier(),e.getPath(),e);
+ LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
+ e.getPath(), e);
return false;
}
}
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeModification;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.spi.TreeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
public synchronized DataTreeModification newModification() {
Preconditions.checkState(sealed, "Attempted to chain on an unsealed modification");
- // FIXME: transaction chaining
- throw new UnsupportedOperationException("Implement this as part of transaction chaining");
+ if(rootNode.getType() == ModificationType.UNMODIFIED) {
+ return snapshot.newModification();
+ }
+
+ /*
+ * FIXME: Add advanced transaction chaining for modification of not rebased
+ * modification.
+ *
+ * Current computation of tempRoot may yeld incorrect subtree versions
+ * if there are multiple concurrent transactions, which may break
+ * versioning preconditions for modification of previously occured write,
+ * directly nested under parent node, since node version is derived from
+ * subtree version.
+ *
+ * For deeper nodes subtree version is derived from their respective metadata
+ * nodes, so this incorrect root subtree version is not affecting us.
+ */
+ TreeNode originalSnapshotRoot = snapshot.getRootNode();
+ Optional<TreeNode> tempRoot = strategyTree.apply(rootNode, Optional.of(originalSnapshotRoot), originalSnapshotRoot.getSubtreeVersion().next());
+
+ InMemoryDataTreeSnapshot tempTree = new InMemoryDataTreeSnapshot(snapshot.getSchemaContext(), tempRoot.get(), strategyTree);
+ return tempTree.newModification();
}
}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private SchemaContext schemaContext;
private InMemoryDOMDataStore domStore;
-
@Before
public void setupStore() {
domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
}
-
@Test
public void testTransactionIsolation() throws InterruptedException, ExecutionException {
assertNotNull(domStore);
-
DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction();
assertNotNull(readTx);
DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
assertNotNull(writeTx);
/**
- *
+ *
* Writes /test in writeTx
- *
+ *
*/
writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
/**
- *
- * Reads /test from writeTx
- * Read should return container.
- *
+ *
+ * Reads /test from writeTx Read should return container.
+ *
*/
ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
assertTrue(writeTxContainer.get().isPresent());
/**
- *
- * Reads /test from readTx
- * Read should return Absent.
- *
- */
+ *
+ * Reads /test from readTx Read should return Absent.
+ *
+ */
ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx.read(TestModel.TEST_PATH);
assertFalse(readTxContainer.get().isPresent());
}
DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
assertNotNull(writeTx);
/**
- *
+ *
* Writes /test in writeTx
- *
+ *
*/
writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
/**
- *
- * Reads /test from writeTx
- * Read should return container.
- *
+ *
+ * Reads /test from writeTx Read should return container.
+ *
*/
ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
assertTrue(writeTxContainer.get().isPresent());
assertThreePhaseCommit(cohort);
- Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH).get();
+ Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
+ .get();
assertTrue(afterCommitRead.isPresent());
}
cohort.preCommit().get();
cohort.abort().get();
- Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH).get();
+ Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
+ .get();
assertFalse(afterCommitRead.isPresent());
}
+ @Test
+ public void testTransactionChain() throws InterruptedException, ExecutionException {
+ DOMStoreTransactionChain txChain = domStore.createTransactionChain();
+ assertNotNull(txChain);
+
+ /**
+ * We alocate new read-write transaction and write /test
+ *
+ *
+ */
+ DOMStoreReadWriteTransaction firstTx = txChain.newReadWriteTransaction();
+ assertTestContainerWrite(firstTx);
+
+ /**
+ * First transaction is marked as ready, we are able to allocate chained
+ * transactions
+ */
+ DOMStoreThreePhaseCommitCohort firstWriteTxCohort = firstTx.ready();
+
+ /**
+ * We alocate chained transaction - read transaction, note first one is
+ * still not commited to datastore.
+ */
+ DOMStoreReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+
+ /**
+ *
+ * We test if we are able to read data from tx, read should not fail
+ * since we are using chained transaction.
+ *
+ *
+ */
+ assertTestContainerExists(secondReadTx);
+
+ /**
+ *
+ * We alocate next transaction, which is still based on first one, but
+ * is read-write.
+ *
+ */
+ DOMStoreReadWriteTransaction thirdDeleteTx = txChain.newReadWriteTransaction();
+
+ /**
+ * We test existence of /test in third transaction container should
+ * still be visible from first one (which is still uncommmited).
+ *
+ *
+ */
+ assertTestContainerExists(thirdDeleteTx);
+
+ /**
+ * We delete node in third transaction
+ */
+ thirdDeleteTx.delete(TestModel.TEST_PATH);
+
+ /**
+ * third transaction is sealed.
+ */
+ DOMStoreThreePhaseCommitCohort thirdDeleteTxCohort = thirdDeleteTx.ready();
+
+ /**
+ * We commit first transaction
+ *
+ */
+ assertThreePhaseCommit(firstWriteTxCohort);
+
+ // Alocates store transacion
+ DOMStoreReadTransaction storeReadTx = domStore.newReadOnlyTransaction();
+ /**
+ * We verify transaction is commited to store, container should exists
+ * in datastore.
+ */
+ assertTestContainerExists(storeReadTx);
+ /**
+ * We commit third transaction
+ *
+ */
+ assertThreePhaseCommit(thirdDeleteTxCohort);
+ }
+
@Test
@Ignore
public void testTransactionConflict() throws InterruptedException, ExecutionException {
assertFalse(txTwo.ready().canCommit().get());
}
-
-
- private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort) throws InterruptedException, ExecutionException {
+ private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort)
+ throws InterruptedException, ExecutionException {
assertTrue(cohort.canCommit().get().booleanValue());
cohort.preCommit().get();
cohort.commit().get();
}
-
- private static Optional<NormalizedNode<?, ?>> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx) throws InterruptedException, ExecutionException {
+ private static Optional<NormalizedNode<?, ?>> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx)
+ throws InterruptedException, ExecutionException {
/**
- *
- * Writes /test in writeTx
- *
- */
- writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ *
+ * Writes /test in writeTx
+ *
+ */
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ return assertTestContainerExists(writeTx);
+ }
+
+ /**
+ *
+ * Reads /test from readTx Read should return container.
+ *
+ */
+ private static Optional<NormalizedNode<?, ?>> assertTestContainerExists(DOMStoreReadTransaction readTx)
+ throws InterruptedException, ExecutionException {
- /**
- *
- * Reads /test from writeTx
- * Read should return container.
- *
- */
- ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
- assertTrue(writeTxContainer.get().isPresent());
- return writeTxContainer.get();
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = readTx.read(TestModel.TEST_PATH);
+ assertTrue(writeTxContainer.get().isPresent());
+ return writeTxContainer.get();
}
}