*/
package org.opendaylight.controller.md.sal.common.api.data;
-import java.util.concurrent.Future;
-
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.yangtools.concepts.Path;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import com.google.common.util.concurrent.ListenableFuture;
+
public interface AsyncWriteTransaction<P extends Path<P>, D> extends AsyncTransaction<P, D> {
/**
* Cancels transaction.
* {@link TransactionStatus#FAILED} is reached.
* @throws IllegalStateException if the transaction is not {@link TransactionStatus#NEW}
*/
- public Future<RpcResult<TransactionStatus>> commit();
+ public ListenableFuture<RpcResult<TransactionStatus>> commit();
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+public class DOMDataBrokerImpl implements DOMDataBroker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
+ private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class);
+ private final ImmutableMap<LogicalDatastoreType, DOMStore> datastores;
+ private final ListeningExecutorService executor;
+
+ public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
+ final ListeningExecutorService executor) {
+ super();
+ this.datastores = datastores;
+ this.executor = executor;
+ }
+
+ private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
+
+ @Override
+ public Boolean apply(final Iterable<Boolean> input) {
+
+ for (Boolean value : input) {
+ if (value == false) {
+ return Boolean.FALSE;
+ }
+ }
+ return Boolean.TRUE;
+ }
+ };
+
+ @Override
+ public DOMDataReadTransaction newReadOnlyTransaction() {
+ ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadTransaction> builder = ImmutableMap.builder();
+ for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
+ builder.put(store.getKey(), store.getValue().newReadOnlyTransaction());
+ }
+ return new ReadOnlyTransactionImpl(newTransactionIdentifier(), builder.build());
+ }
+
+ private Object newTransactionIdentifier() {
+ return new Object();
+ }
+
+ @Override
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadWriteTransaction> builder = ImmutableMap.builder();
+ for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
+ builder.put(store.getKey(), store.getValue().newReadWriteTransaction());
+ }
+ return new ReadWriteTransactionImpl(newTransactionIdentifier(), builder.build(), this);
+ }
+
+ @Override
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ ImmutableMap.Builder<LogicalDatastoreType, DOMStoreWriteTransaction> builder = ImmutableMap.builder();
+ for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
+ builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction());
+ }
+ return new WriteTransactionImpl<DOMStoreWriteTransaction>(newTransactionIdentifier(), builder.build(), this);
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
+ final InstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
+
+ DOMStore potentialStore = datastores.get(store);
+ checkState(potentialStore != null, "Requested logical data store is not available.");
+ return potentialStore.registerChangeListener(path, listener, triggeringScope);
+ }
+
+ private ListenableFuture<RpcResult<TransactionStatus>> submit(
+ final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
+ return executor.submit(new CommitCoordination(transaction));
+ }
+
+ private abstract static class AbstractCompositeTransaction<K, T extends DOMStoreTransaction> implements
+ AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+ private final ImmutableMap<K, T> backingTxs;
+ private final Object identifier;
+
+ protected AbstractCompositeTransaction(final Object identifier, final ImmutableMap<K, T> backingTxs) {
+ this.identifier = checkNotNull(identifier, "Identifier should not be null");
+ this.backingTxs = checkNotNull(backingTxs, "Backing transactions should not be null");
+ }
+
+ protected T getSubtransaction(final K key) {
+ return backingTxs.get(key);
+ }
+
+ public Iterable<T> getSubtransactions() {
+ return backingTxs.values();
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public void close() {
+ try {
+ for (T subtransaction : backingTxs.values()) {
+ subtransaction.close();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Uncaught exception occured during closing transaction.", e);
+ }
+ }
+
+ }
+
+ private static class ReadOnlyTransactionImpl extends
+ AbstractCompositeTransaction<LogicalDatastoreType, DOMStoreReadTransaction> implements
+ DOMDataReadTransaction {
+
+ protected ReadOnlyTransactionImpl(final Object identifier,
+ final ImmutableMap<LogicalDatastoreType, DOMStoreReadTransaction> backingTxs) {
+ super(identifier, backingTxs);
+ }
+
+ @Override
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
+ final InstanceIdentifier path) {
+ return getSubtransaction(store).read(path);
+ }
+
+ }
+
+ private static class WriteTransactionImpl<T extends DOMStoreWriteTransaction> extends
+ AbstractCompositeTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
+
+ private final DOMDataBrokerImpl broker;
+ private ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts;
+
+ protected WriteTransactionImpl(final Object identifier, final ImmutableMap<LogicalDatastoreType, T> backingTxs,
+ final DOMDataBrokerImpl broker) {
+ super(identifier, backingTxs);
+ this.broker = broker;
+ }
+
+ public Iterable<DOMStoreThreePhaseCommitCohort> ready() {
+ checkState(cohorts == null, "Transaction was already marked as ready.");
+ ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
+ for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
+ cohortsBuilder.add(subTx.ready());
+ }
+ cohorts = cohortsBuilder.build();
+ return cohorts;
+ }
+
+ protected ImmutableList<DOMStoreThreePhaseCommitCohort> getCohorts() {
+ return cohorts;
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ getSubtransaction(store).write(path, data);
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
+ getSubtransaction(store).delete(path);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public void cancel() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+
+ ready();
+ return broker.submit(this);
+ }
+
+ }
+
+ private static class ReadWriteTransactionImpl extends WriteTransactionImpl<DOMStoreReadWriteTransaction> implements
+ DOMDataReadWriteTransaction {
+
+ protected ReadWriteTransactionImpl(final Object identifier,
+ final ImmutableMap<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
+ final DOMDataBrokerImpl broker) {
+ // super(identifier, backingTxs);
+ super(identifier, backingTxs, broker);
+ }
+
+ @Override
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
+ final InstanceIdentifier path) {
+ return getSubtransaction(store).read(path);
+ }
+ }
+
+ private final class CommitCoordination implements Callable<RpcResult<TransactionStatus>> {
+
+ private final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction;
+
+ public CommitCoordination(final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
+ this.transaction = transaction;
+ }
+
+ @Override
+ public RpcResult<TransactionStatus> call() throws Exception {
+
+ Boolean canCommit = canCommit().get();
+
+ if (canCommit) {
+ try {
+ preCommit().get();
+ try {
+ commit().get();
+ } catch (InterruptedException | ExecutionException e) {
+ // ERROR
+ }
+
+ } catch (InterruptedException | ExecutionException e) {
+ abort().get();
+ }
+ } else {
+ abort().get();
+ }
+ return null;
+ }
+
+ public ListenableFuture<Void> preCommit() {
+ COORDINATOR_LOG.debug("Transaction {}: PreCommit Started ", transaction.getIdentifier());
+ Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+ for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
+ ops.add(cohort.preCommit());
+ }
+ return (ListenableFuture) Futures.allAsList(ops.build());
+ }
+
+ public ListenableFuture<Void> commit() {
+ COORDINATOR_LOG.debug("Transaction {}: Commit Started ", transaction.getIdentifier());
+ Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+ for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
+ ops.add(cohort.commit());
+ }
+ return (ListenableFuture) Futures.allAsList(ops.build());
+ }
+
+ public ListenableFuture<Boolean> canCommit() {
+ COORDINATOR_LOG.debug("Transaction {}: CanCommit Started ", transaction.getIdentifier());
+ Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
+ for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
+ canCommitOperations.add(cohort.canCommit());
+ }
+ ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
+ return Futures.transform(allCanCommits, AND_FUNCTION);
+ }
+
+ public ListenableFuture<Void> abort() {
+ COORDINATOR_LOG.debug("Transaction {}: Abort Started ", transaction.getIdentifier());
+ Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+ for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
+ ops.add(cohort.abort());
+ }
+ return (ListenableFuture) Futures.allAsList(ops.build());
+ };
+
+ }
+
+}
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import com.google.common.base.Optional;
+import com.google.common.primitives.UnsignedLong;
class DataAndMetadataSnapshot {
return new Builder();
}
+ public static DataAndMetadataSnapshot createEmpty() {
+ return createEmpty(new NodeIdentifier(SchemaContext.NAME));
+ }
+
+
public static DataAndMetadataSnapshot createEmpty(final NodeIdentifier rootNode) {
NormalizedNode<?, ?> data = Builders.containerBuilder().withNodeIdentifier(rootNode).build();
StoreMetadataNode metadata = StoreMetadataNode.builder()
+ .setNodeVersion(UnsignedLong.ZERO)
+ .setSubtreeVersion(UnsignedLong.ZERO)
.setData(data)
.build();
return new DataAndMetadataSnapshot(metadata,Optional.<SchemaContext>absent());
NormalizedNode<?, ?> data = Builders.containerBuilder().withNodeIdentifier(rootNodeIdentifier).build();
StoreMetadataNode metadata = StoreMetadataNode.builder()
.setData(data)
+ .setNodeVersion(UnsignedLong.ZERO)
+ .setSubtreeVersion(UnsignedLong.ZERO)
.build();
return new DataAndMetadataSnapshot(metadata, Optional.of(ctx));
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+
+ private final AtomicLong txCounter = new AtomicLong(0);
+
+ private DataAndMetadataSnapshot snapshot;
+ private ModificationApplyOperation operation;
+
+ private final ListeningExecutorService executor;
+ private final String name;
+
+ private SchemaContext schemaContext;
+
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
+ this.executor = executor;
+ this.name = name;
+ this.operation = new AllwaysFailOperation();
+ this.snapshot = DataAndMetadataSnapshot.createEmpty();
+ }
+
+ @Override
+ public String getIdentifier() {
+ return name;
+ }
+
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction() {
+ return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+ }
+
+ @Override
+ public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operation);
+ }
+
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operation);
+ }
+
+ @Override
+ public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
+ operation = SchemaAwareApplyOperationRoot.from(ctx);
+ schemaContext = ctx;
+ }
+
+ @Override
+ public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
+ final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
+ return null;
+ }
+
+ private synchronized DOMStoreThreePhaseCommitCohort submit(
+ final SnaphostBackedWriteTransaction snaphostBackedWriteTransaction) {
+ return new ThreePhaseCommitImpl(snaphostBackedWriteTransaction);
+ }
+
+ private Object nextIdentifier() {
+ return name + "-" + txCounter.getAndIncrement();
+ }
+
+ private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
+
+ private DataAndMetadataSnapshot stableSnapshot;
+ private final Object identifier;
+
+ public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
+ this.identifier = identifier;
+ this.stableSnapshot = snapshot;
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public void close() {
+ stableSnapshot = null;
+ }
+
+ @Override
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
+ checkNotNull(path, "Path must not be null.");
+ checkState(stableSnapshot != null, "Transaction is closed");
+ return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
+ }
+
+ @Override
+ public String toString() {
+ return "SnapshotBackedReadTransaction [id =" + identifier + "]";
+ }
+
+ }
+
+ private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
+
+ private MutableDataTree mutableTree;
+ private final Object identifier;
+ private InMemoryDOMDataStore store;
+
+ private boolean ready = false;
+
+ public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
+ final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
+ this.identifier = identifier;
+ mutableTree = MutableDataTree.from(snapshot, applyOper);
+ this.store = store;
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public void close() {
+ this.mutableTree = null;
+ this.store = null;
+ }
+
+ @Override
+ public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkNotReady();
+ mutableTree.write(path, data);
+ }
+
+ @Override
+ public void delete(final InstanceIdentifier path) {
+ checkNotReady();
+ mutableTree.delete(path);
+ }
+
+ protected boolean isReady() {
+ return ready;
+ }
+
+ protected void checkNotReady() {
+ checkState(!ready, "Transaction is ready. No further modifications allowed.");
+ }
+
+ @Override
+ public synchronized DOMStoreThreePhaseCommitCohort ready() {
+ ready = true;
+ LOG.debug("Store transaction: {} : Ready",getIdentifier());
+ mutableTree.seal();
+ return store.submit(this);
+ }
+
+ protected MutableDataTree getMutatedView() {
+ return mutableTree;
+ }
+
+ @Override
+ public String toString() {
+ return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
+ }
+
+ }
+
+ private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
+ DOMStoreReadWriteTransaction {
+
+ protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
+ final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
+ super(identifier, snapshot, store, applyOper);
+ }
+
+ @Override
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
+ return Futures.immediateFuture(getMutatedView().read(path));
+ }
+
+ @Override
+ public String toString() {
+ return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
+ }
+
+ }
+
+ private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
+
+ private final SnaphostBackedWriteTransaction transaction;
+ private final NodeModification modification;
+
+ private DataAndMetadataSnapshot storeSnapshot;
+ private Optional<StoreMetadataNode> proposedSubtree;
+
+ public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
+ this.transaction = writeTransaction;
+ this.modification = transaction.getMutatedView().getRootModification();
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ final DataAndMetadataSnapshot snapshotCapture = snapshot;
+ final ModificationApplyOperation snapshotOperation = operation;
+
+ return executor.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ boolean applicable = snapshotOperation.isApplicable(modification, Optional.of(snapshotCapture.getMetadataTree()));
+ LOG.debug("Store Transcation: {} : canCommit : {}",transaction.getIdentifier(),applicable);
+ return applicable;
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ storeSnapshot = snapshot;
+ return executor.submit(new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
+ proposedSubtree = operation.apply(modification, Optional.of(metadataTree),increase(metadataTree.getSubtreeVersion()));
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ storeSnapshot = null;
+ proposedSubtree = null;
+ return Futures.<Void> immediateFuture(null);
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ checkState(proposedSubtree != null);
+ checkState(storeSnapshot != null);
+ // return ImmediateFuture<>;
+ InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree);
+ return Futures.<Void> immediateFuture(null);
+ }
+
+ }
+
+ private synchronized void commit(final DataAndMetadataSnapshot storeSnapshot,
+ final Optional<StoreMetadataNode> proposedSubtree) {
+ //LOG.info("Updating Store snaphot.");
+ checkState(snapshot == storeSnapshot, "Store snapshot and transaction snapshot differs");
+ snapshot = DataAndMetadataSnapshot.builder().setMetadataTree(proposedSubtree.get())
+ .setSchemaContext(schemaContext).build();
+ }
+
+ private class AllwaysFailOperation implements ModificationApplyOperation {
+
+ @Override
+ public Optional<StoreMetadataNode> apply(final NodeModification modification,
+ final Optional<StoreMetadataNode> storeMeta,final UnsignedLong subtreeVersion) {
+ throw new IllegalStateException("Schema Context is not available.");
+ }
+
+ @Override
+ public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
+ throw new IllegalStateException("Schema Context is not available.");
+ }
+
+ @Override
+ public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
+ throw new IllegalStateException("Schema Context is not available.");
+ }
+
+ @Override
+ public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
+ throw new IllegalStateException("Schema Context is not available.");
+ }
+
+ }
+}
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
import com.google.common.base.Optional;
+import com.google.common.primitives.UnsignedLong;
/**
*
* @param storeMeta
* Store Metadata Node on which NodeModification should be
* applied
+ * @param subtreeVersion New subtree version of parent node
* @throws IllegalArgumentException
* If it is not possible to apply Operation on provided Metadata
* node
* node, {@link Optional#absent()} if {@link NodeModification}
* resulted in deletion of this node.
*/
- Optional<StoreMetadataNode> apply(NodeModification modification, Optional<StoreMetadataNode> storeMeta);
+ Optional<StoreMetadataNode> apply(NodeModification modification, Optional<StoreMetadataNode> storeMeta, UnsignedLong subtreeVersion);
+
+ /**
+ *
+ * Checks if provided node modification could be applied to current metadata node.
+ *
+ * @param modification Modification
+ * @param current Metadata Node to which modification should be applied
+ * @return true if modification is applicable
+ * false if modification is no applicable
+ */
+ boolean isApplicable(NodeModification modification, Optional<StoreMetadataNode> current);
+
+ /**
+ *
+ * Performs structural verification of NodeModification, such as writen values / types
+ * uses right structural elements.
+ *
+ * @param modification to be verified.
+ * @throws IllegalArgumentException If provided NodeModification does not adhere to the structure.
+ */
+ void verifyStructure(NodeModification modification) throws IllegalArgumentException;
/**
* Returns a suboperation for specified tree node
@Override
public Optional<ModificationApplyOperation> getChild(PathArgument child);
+
+
}
final NodeModification rootModification;
final ModificationApplyOperation strategyTree;
- private boolean sealed = false;
+ private boolean sealed = false;
private MutableDataTree(final DataAndMetadataSnapshot snapshot, final ModificationApplyOperation strategyTree) {
this.snapshot = snapshot;
return getModifiedVersion(path, modification);
}
- private Optional<NormalizedNode<?, ?>> getModifiedVersion(final InstanceIdentifier path, final Entry<InstanceIdentifier, NodeModification> modification) {
+ private Optional<NormalizedNode<?, ?>> getModifiedVersion(final InstanceIdentifier path,
+ final Entry<InstanceIdentifier, NodeModification> modification) {
Optional<StoreMetadataNode> result = resolveSnapshot(modification);
- if(result.isPresent()) {
+ if (result.isPresent()) {
NormalizedNode<?, ?> data = result.get().getData();
- return NormalizedNodeUtils.findNode(modification.getKey(),data, path);
+ return NormalizedNodeUtils.findNode(modification.getKey(), data, path);
}
return Optional.absent();
}
- private Optional<StoreMetadataNode> resolveSnapshot(final Entry<InstanceIdentifier, NodeModification> keyModification) {
+ private Optional<StoreMetadataNode> resolveSnapshot(
+ final Entry<InstanceIdentifier, NodeModification> keyModification) {
InstanceIdentifier path = keyModification.getKey();
NodeModification modification = keyModification.getValue();
- return resolveSnapshot(path,modification);
+ return resolveSnapshot(path, modification);
}
- private Optional<StoreMetadataNode> resolveSnapshot(final InstanceIdentifier path, final NodeModification modification) {
+ private Optional<StoreMetadataNode> resolveSnapshot(final InstanceIdentifier path,
+ final NodeModification modification) {
try {
- return resolveModificationStrategy(path).apply(modification,modification.getOriginal());
+ return resolveModificationStrategy(path).apply(modification, modification.getOriginal(),
+ StoreUtils.increase(snapshot.getMetadataTree().getSubtreeVersion()));
} catch (Exception e) {
- log.error("Could not create snapshot for {},",e);
+ log.error("Could not create snapshot for {},", e);
throw e;
}
}
private ModificationApplyOperation resolveModificationStrategy(final InstanceIdentifier path) {
- log.trace("Resolving modification apply strategy for {}",path);
+ log.trace("Resolving modification apply strategy for {}", path);
Optional<ModificationApplyOperation> strategy = TreeNodeUtils.findNode(strategyTree, path);
- checkArgument(strategy.isPresent(),"Provided path %s is not supported by data store. No schema available for it.",path);
+ checkArgument(strategy.isPresent(),
+ "Provided path %s is not supported by data store. No schema available for it.", path);
return strategy.get();
}
- private NodeModification resolveModificationFor(final InstanceIdentifier path) {
+ private OperationWithModification resolveModificationFor(final InstanceIdentifier path) {
NodeModification modification = rootModification;
// We ensure strategy is present.
- resolveModificationStrategy(path);
+ ModificationApplyOperation operation = resolveModificationStrategy(path);
for (PathArgument pathArg : path.getPath()) {
modification = modification.modifyChild(pathArg);
}
- return modification;
+ return OperationWithModification.from(operation, modification);
}
public static MutableDataTree from(final DataAndMetadataSnapshot snapshot, final ModificationApplyOperation resolver) {
}
private void checkSealed() {
- checkState(!sealed , "Data Tree is sealed. No further modifications allowed.");
+ checkState(!sealed, "Data Tree is sealed. No further modifications allowed.");
+ }
+
+ protected NodeModification getRootModification() {
+ return rootModification;
}
}
--- /dev/null
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Optional;
+import com.google.common.primitives.UnsignedLong;
+
+public class OperationWithModification {
+
+ private final NodeModification modification;
+ private final ModificationApplyOperation applyOperation;
+
+ private OperationWithModification(final ModificationApplyOperation op, final NodeModification mod) {
+ this.modification = mod;
+ this.applyOperation = op;
+ }
+
+ public OperationWithModification write(final NormalizedNode<?, ?> value) {
+ modification.write(value);
+ applyOperation.verifyStructure(modification);
+ return this;
+ }
+
+ public OperationWithModification delete() {
+ modification.delete();
+ return this;
+ }
+
+ public boolean isApplicable(final Optional<StoreMetadataNode> data) {
+ return applyOperation.isApplicable(modification, data);
+ }
+
+ public Optional<StoreMetadataNode> apply(final Optional<StoreMetadataNode> data, final UnsignedLong subtreeVersion) {
+ return applyOperation.apply(modification, data, subtreeVersion);
+ }
+
+ public static OperationWithModification from(final ModificationApplyOperation operation,
+ final NodeModification modification) {
+ return new OperationWithModification(operation, modification);
+
+ }
+}
\ No newline at end of file
import java.util.Set;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreNodeCompositeBuilder;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeContainerBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.primitives.UnsignedLong;
return potential.get();
}
+ @Override
+ public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
+ if (modification.getModificationType() == ModificationType.WRITE) {
+ verifyWritenStructure(modification.getWritenValue());
+ }
+ }
+
+ protected abstract void verifyWritenStructure(NormalizedNode<?, ?> writenValue);
+
+ @Override
+ public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> current) {
+ switch (modification.getModificationType()) {
+ case DELETE:
+ return isDeleteApplicable(modification, current);
+ case SUBTREE_MODIFIED:
+ return isSubtreeModificationApplicable(modification, current);
+ case WRITE:
+ return isWriteApplicable(modification, current);
+ case UNMODIFIED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ protected boolean isWriteApplicable(final NodeModification modification, final Optional<StoreMetadataNode> current) {
+ Optional<StoreMetadataNode> original = modification.getOriginal();
+ if (original.isPresent() && current.isPresent()) {
+ return isNotConflicting(original.get(), current.get());
+ } else if (current.isPresent()) {
+ return false;
+ }
+ return true;
+
+ }
+
+ protected final boolean isNotConflicting(final StoreMetadataNode original, final StoreMetadataNode current) {
+ return original.getNodeVersion().equals(current.getNodeVersion())
+ && original.getSubtreeVersion().equals(current.getSubtreeVersion());
+ }
+
+ protected abstract boolean isSubtreeModificationApplicable(final NodeModification modification,
+ final Optional<StoreMetadataNode> current);
+
+ private boolean isDeleteApplicable(final NodeModification modification, final Optional<StoreMetadataNode> current) {
+ // FiXME: Add delete conflict detection.
+ return true;
+ }
+
@Override
public final Optional<StoreMetadataNode> apply(final NodeModification modification,
- final Optional<StoreMetadataNode> currentMeta) {
+ final Optional<StoreMetadataNode> currentMeta, final UnsignedLong subtreeVersion) {
switch (modification.getModificationType()) {
case DELETE:
return Optional.absent();
case SUBTREE_MODIFIED:
- return Optional.of(applySubtreeChange(modification, currentMeta.get()));
+ return Optional.of(applySubtreeChange(modification, currentMeta.get(), subtreeVersion));
case WRITE:
- return Optional.of(applyWrite(modification, currentMeta));
+ return Optional.of(applyWrite(modification, currentMeta, subtreeVersion));
case UNMODIFIED:
return currentMeta;
default:
}
protected abstract StoreMetadataNode applyWrite(NodeModification modification,
- Optional<StoreMetadataNode> currentMeta);
+ Optional<StoreMetadataNode> currentMeta, UnsignedLong subtreeVersion);
- protected abstract StoreMetadataNode applySubtreeChange(NodeModification modification, StoreMetadataNode currentMeta);
+ protected abstract StoreMetadataNode applySubtreeChange(NodeModification modification,
+ StoreMetadataNode currentMeta, UnsignedLong subtreeVersion);
public static abstract class ValueNodeModificationStrategy<T extends DataSchemaNode> extends
SchemaAwareApplyOperation {
private final T schema;
+ private final Class<? extends NormalizedNode<?, ?>> nodeClass;
- protected ValueNodeModificationStrategy(final T schema) {
+ protected ValueNodeModificationStrategy(final T schema, final Class<? extends NormalizedNode<?, ?>> nodeClass) {
super();
this.schema = schema;
+ this.nodeClass = nodeClass;
+ }
+
+ @Override
+ protected void verifyWritenStructure(final NormalizedNode<?, ?> writenValue) {
+ checkArgument(nodeClass.isInstance(writenValue), "Node should must be of type %s", nodeClass);
}
@Override
}
@Override
- protected StoreMetadataNode applySubtreeChange(final NodeModification modification, final StoreMetadataNode currentMeta) {
+ protected StoreMetadataNode applySubtreeChange(final NodeModification modification,
+ final StoreMetadataNode currentMeta, final UnsignedLong subtreeVersion) {
throw new UnsupportedOperationException("Node " + schema.getPath()
+ "is leaf type node. Subtree change is not allowed.");
}
@Override
- protected StoreMetadataNode applyWrite(final NodeModification modification, final Optional<StoreMetadataNode> currentMeta) {
- return StoreMetadataNode.builder()
- // FIXME Add .increaseNodeVersion()
+ protected StoreMetadataNode applyWrite(final NodeModification modification,
+ final Optional<StoreMetadataNode> currentMeta, final UnsignedLong subtreeVersion) {
+ UnsignedLong nodeVersion = subtreeVersion;
+ if (currentMeta.isPresent()) {
+ nodeVersion = StoreUtils.increase(currentMeta.get().getNodeVersion());
+ }
+
+ return StoreMetadataNode.builder().setNodeVersion(nodeVersion).setSubtreeVersion(subtreeVersion)
.setData(modification.getWritenValue()).build();
}
+ @Override
+ protected boolean isSubtreeModificationApplicable(final NodeModification modification,
+ final Optional<StoreMetadataNode> current) {
+ return false;
+ }
+
}
public static class LeafSetEntryModificationStrategy extends ValueNodeModificationStrategy<LeafListSchemaNode> {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
protected LeafSetEntryModificationStrategy(final LeafListSchemaNode schema) {
- super(schema);
+ super(schema, (Class) LeafSetEntryNode.class);
}
}
public static class LeafModificationStrategy extends ValueNodeModificationStrategy<LeafSchemaNode> {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
protected LeafModificationStrategy(final LeafSchemaNode schema) {
- super(schema);
+ super(schema, (Class) LeafNode.class);
}
}
public static abstract class NormalizedNodeContainerModificationStrategy extends SchemaAwareApplyOperation {
+ private final Class<? extends NormalizedNode<?, ?>> nodeClass;
+
+ protected NormalizedNodeContainerModificationStrategy(final Class<? extends NormalizedNode<?, ?>> nodeClass) {
+ this.nodeClass = nodeClass;
+ }
+
+
+ @Override
+ public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
+ if(modification.getModificationType() == ModificationType.WRITE) {
+
+ }
+ for(NodeModification childModification : modification.getModifications()) {
+ resolveChildOperation(childModification.getIdentifier()).verifyStructure(childModification);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected void verifyWritenStructure(final NormalizedNode<?, ?> writenValue) {
+ checkArgument(nodeClass.isInstance(writenValue), "Node should must be of type %s", nodeClass);
+ checkArgument(writenValue instanceof NormalizedNodeContainer);
+ NormalizedNodeContainer writenCont = (NormalizedNodeContainer) writenValue;
+ for(Object child : writenCont.getValue()) {
+ checkArgument(child instanceof NormalizedNode);
+ NormalizedNode childNode = (NormalizedNode) child;
+ }
+ }
+
@Override
- protected StoreMetadataNode applyWrite(final NodeModification modification, final Optional<StoreMetadataNode> currentMeta) {
+ protected StoreMetadataNode applyWrite(final NodeModification modification,
+ final Optional<StoreMetadataNode> currentMeta, final UnsignedLong subtreeVersion) {
//
NormalizedNode<?, ?> newValue = modification.getWritenValue();
- StoreMetadataNode newValueMeta = StoreMetadataNode.createRecursivelly(newValue, UnsignedLong.valueOf(0));
+ UnsignedLong nodeVersion = subtreeVersion;
+ if (currentMeta.isPresent()) {
+ nodeVersion = StoreUtils.increase(currentMeta.get().getNodeVersion());
+ }
+ StoreMetadataNode newValueMeta = StoreMetadataNode.createRecursivelly(newValue, nodeVersion, nodeVersion);
- if(!modification.hasAdditionalModifications()) {
+ if (!modification.hasAdditionalModifications()) {
return newValueMeta;
}
- StoreNodeCompositeBuilder builder = StoreNodeCompositeBuilder.from(newValueMeta,
- createBuilder(modification.getIdentifier()));
+ @SuppressWarnings("rawtypes")
+ NormalizedNodeContainerBuilder dataBuilder = createBuilder(modification.getIdentifier());
+ StoreNodeCompositeBuilder builder = StoreNodeCompositeBuilder.from(dataBuilder) //
+ .setNodeVersion(nodeVersion) //
+ .setSubtreeVersion(subtreeVersion);
- Set<PathArgument> processedPreexisting = applyPreexistingChildren(modification, newValueMeta.getChildren(), builder);
- applyNewChildren(modification, processedPreexisting, builder);
+ Set<PathArgument> processedPreexisting = applyPreexistingChildren(modification, newValueMeta.getChildren(),
+ builder, nodeVersion);
+ applyNewChildren(modification, processedPreexisting, builder, nodeVersion);
return builder.build();
}
@Override
- @SuppressWarnings("rawtypes")
- public StoreMetadataNode applySubtreeChange(final NodeModification modification, final StoreMetadataNode currentMeta) {
-
- StoreNodeCompositeBuilder builder = StoreNodeCompositeBuilder.from(currentMeta,
- createBuilder(modification.getIdentifier()));
- builder.setIdentifier(modification.getIdentifier());
-
+ public StoreMetadataNode applySubtreeChange(final NodeModification modification,
+ final StoreMetadataNode currentMeta, final UnsignedLong subtreeVersion) {
+
+ UnsignedLong updatedSubtreeVersion = StoreUtils.increase(currentMeta.getSubtreeVersion());
+ @SuppressWarnings("rawtypes")
+ NormalizedNodeContainerBuilder dataBuilder = createBuilder(modification.getIdentifier());
+ StoreNodeCompositeBuilder builder = StoreNodeCompositeBuilder.from(dataBuilder)
+ //
+ .setIdentifier(modification.getIdentifier()).setNodeVersion(currentMeta.getNodeVersion())
+ .setSubtreeVersion(updatedSubtreeVersion);
// We process preexisting nodes
- Set<PathArgument> processedPreexisting = applyPreexistingChildren(modification,
- currentMeta.getChildren(), builder);
- applyNewChildren(modification, processedPreexisting, builder);
+ Set<PathArgument> processedPreexisting = applyPreexistingChildren(modification, currentMeta.getChildren(),
+ builder, updatedSubtreeVersion);
+ applyNewChildren(modification, processedPreexisting, builder, updatedSubtreeVersion);
return builder.build();
}
private void applyNewChildren(final NodeModification modification, final Set<PathArgument> ignore,
- final StoreNodeCompositeBuilder builder) {
+ final StoreNodeCompositeBuilder builder, final UnsignedLong subtreeVersion) {
for (NodeModification childModification : modification.getModifications()) {
PathArgument childIdentifier = childModification.getIdentifier();
// We skip allready processed modifications
if (ignore.contains(childIdentifier)) {
continue;
}
- Optional<StoreMetadataNode> childResult = resolveChildOperation(childIdentifier) //
- .apply(childModification, Optional.<StoreMetadataNode> absent());
- if (childResult.isPresent()) {
- builder.add(childResult.get());
- }
+
+ builder.addIfPresent(resolveChildOperation(childIdentifier) //
+ .apply(childModification, Optional.<StoreMetadataNode> absent(), subtreeVersion));
}
}
private Set<PathArgument> applyPreexistingChildren(final NodeModification modification,
- final Iterable<StoreMetadataNode> children, final StoreNodeCompositeBuilder nodeBuilder) {
+ final Iterable<StoreMetadataNode> children, final StoreNodeCompositeBuilder nodeBuilder,
+ final UnsignedLong subtreeVersion) {
Builder<PathArgument> processedModifications = ImmutableSet.<PathArgument> builder();
for (StoreMetadataNode childMeta : children) {
PathArgument childIdentifier = childMeta.getIdentifier();
// Node is modified
if (childModification.isPresent()) {
processedModifications.add(childIdentifier);
- Optional<StoreMetadataNode> change = resolveChildOperation(childIdentifier) //
- .apply(childModification.get(), Optional.of(childMeta));
+ Optional<StoreMetadataNode> result = resolveChildOperation(childIdentifier) //
+ .apply(childModification.get(), Optional.of(childMeta), subtreeVersion);
+ nodeBuilder.addIfPresent(result);
} else {
// Child is unmodified - reuse existing metadata and data
// snapshot
return processedModifications.build();
}
+ @Override
+ protected boolean isSubtreeModificationApplicable(final NodeModification modification,
+ final Optional<StoreMetadataNode> current) {
+ if (false == current.isPresent()) {
+ return false;
+ }
+ boolean result = true;
+ StoreMetadataNode currentMeta = current.get();
+ for (NodeModification childMod : modification.getModifications()) {
+ PathArgument childId = childMod.getIdentifier();
+ Optional<StoreMetadataNode> childMeta = currentMeta.getChild(childId);
+ result &= resolveChildOperation(childId).isApplicable(childMod, childMeta);
+ }
+ return result;
+ }
+
@SuppressWarnings("rawtypes")
protected abstract NormalizedNodeContainerBuilder createBuilder(PathArgument identifier);
}
NormalizedNodeContainerModificationStrategy {
private final T schema;
+ private final Cache<PathArgument, ModificationApplyOperation> childCache = CacheBuilder.newBuilder()
+ .build(CacheLoader.from(new Function<PathArgument, ModificationApplyOperation>() {
+
+ @Override
+ public ModificationApplyOperation apply(final PathArgument identifier) {
+ DataSchemaNode child = schema.getDataChildByName(identifier.getNodeType());
+ if (child == null || child.isAugmenting()) {
+ return null;
+ }
+ return from(child);
+ }
+ }));
- protected DataNodeContainerModificationStrategy(final T schema) {
- super();
+ protected DataNodeContainerModificationStrategy(final T schema,
+ final Class<? extends NormalizedNode<?, ?>> nodeClass) {
+ super(nodeClass);
this.schema = schema;
}
DataNodeContainerModificationStrategy<ContainerSchemaNode> {
public ContainerModificationStrategy(final ContainerSchemaNode schemaNode) {
- super(schemaNode);
+ super(schemaNode, ContainerNode.class);
}
@Override
private final ChoiceNode schema;
public ChoiceModificationStrategy(final ChoiceNode schemaNode) {
+ super(org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode.class);
this.schema = schemaNode;
}
public static class ListEntryModificationStrategy extends DataNodeContainerModificationStrategy<ListSchemaNode> {
protected ListEntryModificationStrategy(final ListSchemaNode schema) {
- super(schema);
+ super(schema, MapEntryNode.class);
}
@Override
private final Optional<ModificationApplyOperation> entryStrategy;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
protected LeafSetModificationStrategy(final LeafListSchemaNode schema) {
+ super((Class) LeafSetNode.class);
entryStrategy = Optional.<ModificationApplyOperation> of(new LeafSetEntryModificationStrategy(schema));
}
+ @SuppressWarnings("rawtypes")
@Override
protected NormalizedNodeContainerBuilder createBuilder(final PathArgument identifier) {
return ImmutableLeafSetNodeBuilder.create().withNodeIdentifier((NodeIdentifier) identifier);
private final Optional<ModificationApplyOperation> entryStrategy;
protected ListMapModificationStrategy(final ListSchemaNode schema) {
+ super(MapNode.class);
entryStrategy = Optional.<ModificationApplyOperation> of(new ListEntryModificationStrategy(schema));
}
+ @SuppressWarnings("rawtypes")
@Override
protected NormalizedNodeContainerBuilder createBuilder(final PathArgument identifier) {
return ImmutableMapNodeBuilder.create().withNodeIdentifier((NodeIdentifier) identifier);
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
private final SchemaContext context;
public SchemaAwareApplyOperationRoot(final SchemaContext context) {
- super(context);
+ super(context,ContainerNode.class);
this.context = context;
}
--- /dev/null
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.primitives.UnsignedLong;
+
+public final class StoreUtils {
+
+
+ public static final UnsignedLong increase(final UnsignedLong original) {
+ return original.plus(UnsignedLong.ONE);
+ }
+
+
+
+
+}
private final Map<PathArgument, StoreMetadataNode> children;
- protected StoreMetadataNode(final NormalizedNode<?, ?> data, final UnsignedLong nodeVersion, final UnsignedLong subtreeVersion,
- final Map<PathArgument, StoreMetadataNode> children) {
+ protected StoreMetadataNode(final NormalizedNode<?, ?> data, final UnsignedLong nodeVersion,
+ final UnsignedLong subtreeVersion, final Map<PathArgument, StoreMetadataNode> children) {
this.nodeVersion = nodeVersion;
this.subtreeVersion = subtreeVersion;
this.data = data;
return Optional.absent();
}
- public static Optional<StoreMetadataNode> getChild(final Optional<StoreMetadataNode> parent, final PathArgument child) {
+ public static Optional<StoreMetadataNode> getChild(final Optional<StoreMetadataNode> parent,
+ final PathArgument child) {
if (parent.isPresent()) {
return parent.get().getChild(child);
}
return Optional.absent();
}
- public static final StoreMetadataNode createRecursivelly(final NormalizedNode<?, ?> node, final UnsignedLong version) {
+ public static final StoreMetadataNode createRecursivelly(final NormalizedNode<?, ?> node,
+ final UnsignedLong nodeVersion, final UnsignedLong subtreeVersion) {
Builder builder = builder() //
- .setNodeVersion(version) //
- .setSubtreeVersion(version) //
+ .setNodeVersion(nodeVersion) //
+ .setSubtreeVersion(subtreeVersion) //
.setData(node);
- if(node instanceof NormalizedNodeContainer<?, ?, ?>) {
+ if (node instanceof NormalizedNodeContainer<?, ?, ?>) {
@SuppressWarnings("unchecked")
NormalizedNodeContainer<?, ?, NormalizedNode<?, ?>> nodeContainer = (NormalizedNodeContainer<?, ?, NormalizedNode<?, ?>>) node;
- for(NormalizedNode<?, ?> subNode : nodeContainer.getValue()) {
- builder.add(createRecursivelly(subNode, version));
+ for (NormalizedNode<?, ?> subNode : nodeContainer.getValue()) {
+ builder.add(createRecursivelly(subNode, nodeVersion, subtreeVersion));
}
}
return builder.build();
public static class Builder {
- private Builder() {
+ private UnsignedLong nodeVersion;
+ private UnsignedLong subtreeVersion;
+ private NormalizedNode<?, ?> data;
+ private final ImmutableMap.Builder<PathArgument, StoreMetadataNode> children = ImmutableMap.builder();
- }
-
- UnsignedLong nodeVersion = UnsignedLong.valueOf(0);
- UnsignedLong subtreeVersion = UnsignedLong.valueOf(0);
- NormalizedNode<?, ?> data;
+ private Builder() {}
- final ImmutableMap.Builder<PathArgument, StoreMetadataNode> children = ImmutableMap.builder();
public UnsignedLong getVersion() {
return nodeVersion;
return this;
}
- public Builder setData(final NormalizedNode<?,?> data) {
+ public Builder setData(final NormalizedNode<?, ?> data) {
this.data = data;
return this;
}
}
public StoreMetadataNode build() {
- checkState(data != null,"Data node should not be null.");
- checkState(subtreeVersion.compareTo(nodeVersion) >= 0, "Subtree version must be equals or greater than node version.");
+ checkState(data != null, "Data node should not be null.");
+ checkState(subtreeVersion.compareTo(nodeVersion) >= 0,
+ "Subtree version must be equals or greater than node version.");
return new StoreMetadataNode(data, nodeVersion, subtreeVersion, children.build());
}
}
+ public static StoreMetadataNode createRecursivelly(final NormalizedNode<?, ?> node, final UnsignedLong version) {
+ return createRecursivelly(node, version, version);
+ }
+
}
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeContainerBuilder;
+import com.google.common.base.Optional;
+import com.google.common.primitives.UnsignedLong;
+
/**
*
* Helper builder
private final NormalizedNodeContainerBuilder data;
-
private StoreNodeCompositeBuilder(final NormalizedNodeContainerBuilder nodeBuilder) {
this.metadata = StoreMetadataNode.builder();
this.data = nodeBuilder;
return this;
}
+ @SuppressWarnings("unchecked")
+ public StoreNodeCompositeBuilder addIfPresent(final Optional<StoreMetadataNode> potential) {
+ if (potential.isPresent()) {
+ StoreMetadataNode node = potential.get();
+ metadata.add(node);
+ data.addChild(node.getData());
+ }
+ return this;
+ }
public StoreMetadataNode build() {
return metadata.setData(data.build()).build();
}
-
public static StoreNodeCompositeBuilder from(final NormalizedNodeContainerBuilder nodeBuilder) {
return new StoreNodeCompositeBuilder(nodeBuilder);
}
- public static StoreNodeCompositeBuilder from(final StoreMetadataNode previous, final NormalizedNodeContainerBuilder nodeBuilder) {
+ @SuppressWarnings("unchecked")
+ public StoreNodeCompositeBuilder setIdentifier(final PathArgument identifier) {
+ data.withNodeIdentifier(identifier);
+ return this;
+ }
- return new StoreNodeCompositeBuilder(nodeBuilder);
+ public StoreNodeCompositeBuilder setNodeVersion(final UnsignedLong nodeVersion) {
+ metadata.setNodeVersion(nodeVersion);
+ return this;
}
- public StoreNodeCompositeBuilder setIdentifier(final PathArgument identifier) {
- data.withNodeIdentifier(identifier);
+ public StoreNodeCompositeBuilder setSubtreeVersion(final UnsignedLong updatedSubtreeVersion) {
+ metadata.setSubtreeVersion(updatedSubtreeVersion);
return this;
}
--- /dev/null
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class DOMBrokerPerformanceTest {
+
+ private static final Logger log = LoggerFactory.getLogger(DOMBrokerPerformanceTest.class);
+
+ private static NormalizedNode<?, ?> outerList(final int i) {
+ return ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i);
+ }
+
+ private static InstanceIdentifier outerListPath(final int i) {
+ return InstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)//
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i) //
+ .build();
+ }
+
+ private SchemaContext schemaContext;
+ private DOMDataBrokerImpl domBroker;
+
+ private static <V> V measure(final String name, final Callable<V> callable) throws Exception {
+ // TODO Auto-generated method stub
+ log.debug("Measurement:{} Start", name);
+ long startNano = System.nanoTime();
+ try {
+ return callable.call();
+ } finally {
+ long endNano = System.nanoTime();
+ log.info("Measurement:\"{}\" Time:{} ms", name, (endNano - startNano) / 1000000.0d);
+ }
+ }
+
+ @Before
+ public void setupStore() {
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+ schemaContext = TestModel.createTestContext();
+
+ operStore.onGlobalContextUpdated(schemaContext);
+ configStore.onGlobalContextUpdated(schemaContext);
+
+ ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
+ .put(CONFIGURATION, configStore) //
+ .put(OPERATIONAL, operStore) //
+ .build();
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ domBroker = new DOMDataBrokerImpl(stores, executor);
+ }
+
+ @Test
+ public void testPerformance() throws Exception {
+ measure("Test Suite (all tests)", new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ smallTestSuite(10, 1000);
+ //smallTestSuite(10, 100);
+ smallTestSuite(100, 100);
+ //smallTestSuite(100, 100);
+ //smallTestSuite(1000, 10);
+ smallTestSuite(1000, 10);
+ //smallTestSuite(1000, 1000);
+ return null;
+ }
+ });
+ }
+
+ private void smallTestSuite(final int txNum, final int innerListWriteNum) throws Exception {
+ measure("TestSuite (Txs:" + txNum + " innerWrites:" + innerListWriteNum + ")", new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ measureOneTransactionTopContainer();
+ measureSeparateWritesOneLevel(txNum, innerListWriteNum);
+ return null;
+ }
+ });
+ }
+
+ private void measureSeparateWritesOneLevel(final int txNum, final int innerNum) throws Exception {
+ final List<DOMDataReadWriteTransaction> transactions = measure("Txs:"+ txNum + " Allocate",
+ new Callable<List<DOMDataReadWriteTransaction>>() {
+ @Override
+ public List<DOMDataReadWriteTransaction> call() throws Exception {
+ List<DOMDataReadWriteTransaction> builder = new ArrayList<>(txNum);
+ for (int i = 0; i < txNum; i++) {
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ builder.add(writeTx);
+ }
+ return builder;
+ }
+ });
+ assertEquals(txNum, transactions.size());
+ measure("Txs:"+ txNum + " Writes:1", new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ int i = 0;
+ for (DOMDataReadWriteTransaction writeTx :transactions) {
+ // Writes /test/outer-list/i in writeTx
+ writeTx.put(OPERATIONAL, outerListPath(i), outerList(i));
+ i++;
+ }
+ return null;
+ }
+ });
+
+ measure("Txs:"+ txNum + " Writes:" + innerNum, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ int i = 0;
+ for (DOMDataReadWriteTransaction writeTx :transactions) {
+ // Writes /test/outer-list/i in writeTx
+ InstanceIdentifier path = InstanceIdentifier.builder(outerListPath(i))
+ .node(TestModel.INNER_LIST_QNAME).build();
+ writeTx.put(OPERATIONAL, path, ImmutableNodes.mapNodeBuilder(TestModel.INNER_LIST_QNAME).build());
+ for (int j = 0; j < innerNum; j++) {
+ InstanceIdentifier innerPath = InstanceIdentifier.builder(path)
+ .nodeWithKey(TestModel.INNER_LIST_QNAME, TestModel.NAME_QNAME, String.valueOf(j))
+ .build();
+ writeTx.put(
+ OPERATIONAL,
+ innerPath,
+ ImmutableNodes.mapEntry(TestModel.INNER_LIST_QNAME, TestModel.NAME_QNAME,
+ String.valueOf(j)));
+ }
+ i++;
+ }
+ return null;
+ }
+ });
+
+ measure("Txs:" + txNum + " Submit, Finish", new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ List<ListenableFuture<?>> allFutures = measure(txNum + " Submits",
+ new Callable<List<ListenableFuture<?>>>() {
+ @Override
+ public List<ListenableFuture<?>> call() throws Exception {
+ List<ListenableFuture<?>> builder = new ArrayList<>(txNum);
+ for (DOMDataReadWriteTransaction tx :transactions) {
+ builder.add(tx.commit());
+ }
+ return builder;
+ }
+ });
+ Futures.allAsList(allFutures).get();
+ return null;
+ }
+ });
+
+ final DOMDataReadTransaction readTx = measure("Txs:1 (ro), Allocate", new Callable<DOMDataReadTransaction>() {
+ @Override
+ public DOMDataReadTransaction call() throws Exception {
+ return domBroker.newReadOnlyTransaction();
+
+ }
+ });
+
+
+ measure("Txs:1 (ro) Reads:" + txNum + " (1-level)" , new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (int i = 0; i < txNum; i++) {
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> potential = readTx.read(OPERATIONAL,
+ outerListPath(i));
+ assertTrue("outerList/" + i, potential.get().isPresent());
+ }
+ return null;
+ }
+ });
+
+ measure("Txs:1 (ro) Reads:" + txNum * innerNum + " (2-level)", new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (int i = 0; i < txNum; i++) {
+ for (int j = 0; j < innerNum; j++) {
+ InstanceIdentifier path = InstanceIdentifier
+ .builder(outerListPath(i))
+ //
+ .node(TestModel.INNER_LIST_QNAME)
+ .nodeWithKey(TestModel.INNER_LIST_QNAME, TestModel.NAME_QNAME, String.valueOf(j))
+ .build();
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> potential = readTx.read(OPERATIONAL, path);
+ assertTrue("outer-list/" + i + "/inner-list/" + j, potential.get().isPresent());
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+ private void measureOneTransactionTopContainer() throws Exception {
+
+ final DOMDataReadWriteTransaction writeTx = measure("Txs:1 Allocate", new Callable<DOMDataReadWriteTransaction>() {
+ @Override
+ public DOMDataReadWriteTransaction call() throws Exception {
+ return domBroker.newReadWriteTransaction();
+ }
+ });
+
+ measure("Txs:1 Write", new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeTx.put(OPERATIONAL, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+ return null;
+ }
+ });
+
+ measure("Txs:1 Reads:1", new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // Reads /test in writeTx
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
+ TestModel.TEST_PATH);
+ assertTrue(writeTxContainer.get().isPresent());
+ return null;
+ }
+ });
+
+ measure("Txs:1 Reads:1", new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // Reads /test in writeTx
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
+ TestModel.TEST_PATH);
+ assertTrue(writeTxContainer.get().isPresent());
+ return null;
+ }
+ });
+
+ measure("Txs:1 Submit, Finish", new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ measure("Txs:1 Submit", new Callable<ListenableFuture<?>>() {
+ @Override
+ public ListenableFuture<?> call() throws Exception {
+ return writeTx.commit();
+ }
+ }).get();
+ return null;
+ }
+ });
+ }
+}
--- /dev/null
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class DOMBrokerTest {
+
+ private SchemaContext schemaContext;
+ private DOMDataBrokerImpl domBroker;
+
+ @Before
+ public void setupStore() {
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+ schemaContext = TestModel.createTestContext();
+
+ operStore.onGlobalContextUpdated(schemaContext);
+ configStore.onGlobalContextUpdated(schemaContext);
+
+ ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
+ .put(CONFIGURATION, configStore) //
+ .put(OPERATIONAL, operStore) //
+ .build();
+
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ domBroker = new DOMDataBrokerImpl(stores, executor);
+ }
+
+ @Test
+ public void testTransactionIsolation() throws InterruptedException, ExecutionException {
+
+ assertNotNull(domBroker);
+
+ DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
+ assertNotNull(readTx);
+
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ assertNotNull(writeTx);
+ /**
+ *
+ * Writes /test in writeTx
+ *
+ */
+ writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ /**
+ *
+ * Reads /test from writeTx Read should return container.
+ *
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
+ TestModel.TEST_PATH);
+ assertTrue(writeTxContainer.get().isPresent());
+
+ /**
+ *
+ * Reads /test from readTx Read should return Absent.
+ *
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
+ .read(OPERATIONAL, TestModel.TEST_PATH);
+ assertFalse(readTxContainer.get().isPresent());
+ }
+
+ @Test
+ public void testTransactionCommit() throws InterruptedException, ExecutionException {
+
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ assertNotNull(writeTx);
+ /**
+ *
+ * Writes /test in writeTx
+ *
+ */
+ writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ /**
+ *
+ * Reads /test from writeTx Read should return container.
+ *
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
+ TestModel.TEST_PATH);
+ assertTrue(writeTxContainer.get().isPresent());
+
+ writeTx.commit().get();
+
+ Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
+ .read(OPERATIONAL, TestModel.TEST_PATH).get();
+ assertTrue(afterCommitRead.isPresent());
+ }
+
+
+
+}
--- /dev/null
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class InMemoryDataStoreTest {
+
+ private SchemaContext schemaContext;
+ private InMemoryDOMDataStore domStore;
+
+
+ @Before
+ public void setupStore() {
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+ schemaContext = TestModel.createTestContext();
+ domStore.onGlobalContextUpdated(schemaContext);
+
+ }
+
+
+ @Test
+ public void testTransactionIsolation() throws InterruptedException, ExecutionException {
+
+ assertNotNull(domStore);
+
+
+ DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction();
+ assertNotNull(readTx);
+
+ DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+ assertNotNull(writeTx);
+ /**
+ *
+ * Writes /test in writeTx
+ *
+ */
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ /**
+ *
+ * Reads /test from writeTx
+ * Read should return container.
+ *
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
+ assertTrue(writeTxContainer.get().isPresent());
+
+ /**
+ *
+ * Reads /test from readTx
+ * Read should return Absent.
+ *
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx.read(TestModel.TEST_PATH);
+ assertFalse(readTxContainer.get().isPresent());
+ }
+
+ @Test
+ public void testTransactionCommit() throws InterruptedException, ExecutionException {
+
+ DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+ assertNotNull(writeTx);
+ /**
+ *
+ * Writes /test in writeTx
+ *
+ */
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ /**
+ *
+ * Reads /test from writeTx
+ * Read should return container.
+ *
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
+ assertTrue(writeTxContainer.get().isPresent());
+
+ DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+
+ assertThreePhaseCommit(cohort);
+
+ Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH).get();
+ assertTrue(afterCommitRead.isPresent());
+ }
+
+ @Test
+ public void testTransactionAbort() throws InterruptedException, ExecutionException {
+
+ DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+ assertNotNull(writeTx);
+
+ assertTestContainerWrite(writeTx);
+
+ DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+
+ assertTrue(cohort.canCommit().get().booleanValue());
+ cohort.preCommit().get();
+ cohort.abort().get();
+
+ Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH).get();
+ assertFalse(afterCommitRead.isPresent());
+ }
+
+ @Test
+ public void testTransactionConflict() throws InterruptedException, ExecutionException {
+ DOMStoreReadWriteTransaction txOne = domStore.newReadWriteTransaction();
+ DOMStoreReadWriteTransaction txTwo = domStore.newReadWriteTransaction();
+ assertTestContainerWrite(txOne);
+ assertTestContainerWrite(txTwo);
+
+ /**
+ * Commits transaction
+ */
+ assertThreePhaseCommit(txOne.ready());
+
+ /**
+ * Asserts that txTwo could not be commited
+ */
+ assertFalse(txTwo.ready().canCommit().get());
+ }
+
+
+
+ private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort) throws InterruptedException, ExecutionException {
+ assertTrue(cohort.canCommit().get().booleanValue());
+ cohort.preCommit().get();
+ cohort.commit().get();
+ }
+
+
+ private static Optional<NormalizedNode<?, ?>> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx) throws InterruptedException, ExecutionException {
+ /**
+ *
+ * Writes /test in writeTx
+ *
+ */
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ /**
+ *
+ * Reads /test from writeTx
+ * Read should return container.
+ *
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
+ assertTrue(writeTxContainer.get().isPresent());
+ return writeTxContainer.get();
+ }
+
+}
.withChild(BAR_NODE).build()).build();
}
- private StoreMetadataNode createDocumentOneMetadata() {
- UnsignedLong version = UnsignedLong.valueOf(0);
- return StoreMetadataNode.createRecursivelly(createDocumentOne(), version);
- }
-
@Test
public void basicReadWrites() {
MutableDataTree modificationTree = MutableDataTree.from(
DataAndMetadataSnapshot.builder() //
- .setMetadataTree(createDocumentOneMetadata()) //
+ .setMetadataTree(StoreMetadataNode.createRecursivelly(createDocumentOne(), UnsignedLong.valueOf(5))) //
.setSchemaContext(schemaContext) //
.build(), new SchemaAwareApplyOperationRoot(schemaContext));
Optional<NormalizedNode<?, ?>> originalBarNode = modificationTree.read(OUTER_LIST_2_PATH);