<feature version='[3.3.0,4.0.0)'>odl-lmax</feature>
<!-- FIXME: Bug 4202: Add MD-SAL provided odl-mdsal-binding-adapter -->
<!-- FIXME: Bug 4202: Add MD-SAL provided odl-mdsal-dom-broker -->
+ <feature version='${mdsal.version}'>odl-mdsal-dom</feature>
<feature version='${mdsal.version}'>odl-mdsal-eos-dom</feature>
<feature version='${mdsal.version}'>odl-mdsal-eos-binding</feature>
<feature version='${mdsal.version}'>odl-mdsal-singleton-dom</feature>
<groupId>org.opendaylight.mdsal</groupId>
<artifactId>mdsal-eos-dom-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-dom-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-dom-broker</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-common-impl</artifactId>
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataBrokerAdapter;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+
+/**
+ * DOMDataBroker implementation that forwards calls to {@link org.opendaylight.mdsal.dom.broker.ShardedDOMDataBrokerAdapter},
+ * which in turn translates calls to shard aware implementation of {@link org.opendaylight.mdsal.dom.api.DOMDataTreeService}
+ * <p>
+ * The incompatibility between first and latter APIs, puts restriction on {@link DOMDataReadWriteTransaction}
+ * and {@link DOMDataReadOnlyTransaction} provided by this data broker. See {@link ShardedDOMDataBrokerDelegatingReadWriteTransaction}
+ * and {@link ShardedDOMDataBrokerDelegatingReadTransaction} respectively.
+ */
+// FIXME try to refactor some of the implementation to abstract class for better reusability
+public class LegacyShardedDOMDataBrokerAdapter implements DOMDataBroker {
+
+ private final org.opendaylight.mdsal.dom.api.DOMDataBroker delegateDataBroker;
+ private final SchemaService schemaService;
+ private final AtomicLong txNum = new AtomicLong();
+ private final AtomicLong chainNum = new AtomicLong();
+
+ public LegacyShardedDOMDataBrokerAdapter(final ShardedDOMDataBrokerAdapter delegateDataBroker,
+ final SchemaService schemaService) {
+ this.delegateDataBroker = checkNotNull(delegateDataBroker);
+ this.schemaService = checkNotNull(schemaService);
+ }
+
+ @Override
+ public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ return new ShardedDOMDataBrokerDelegatingReadTransaction(newTransactionIdentifier(),
+ delegateDataBroker.newReadOnlyTransaction());
+ }
+
+ @Override
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ return new ShardedDOMDataBrokerDelegatingReadWriteTransaction(newTransactionIdentifier(), schemaService.getGlobalContext(),
+ newReadOnlyTransaction(), newWriteOnlyTransaction());
+ }
+
+ @Override
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ return new ShardedDOMDataBrokerDelegatingWriteTransaction(newTransactionIdentifier(),
+ delegateDataBroker.newWriteOnlyTransaction());
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path,
+ final DOMDataChangeListener listener,
+ final DataChangeScope triggeringScope) {
+ throw new UnsupportedOperationException("Registering data change listeners is not supported in " +
+ "md-sal forwarding data broker");
+
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ return new ShardedDOMDataBrokerDelegatingTransactionChain(chainNum.getAndIncrement(), schemaService.getGlobalContext(),
+ delegateDataBroker, listener);
+ }
+
+ @Nonnull
+ @Override
+ public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+ return Collections.emptyMap();
+ }
+
+ private Object newTransactionIdentifier() {
+ return "DOM-" + txNum.getAndIncrement();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+
+final class LegacyShardedDOMDataBrokerAdapterUtils {
+
+ private LegacyShardedDOMDataBrokerAdapterUtils() {
+ throw new AssertionError("Util class should not be instantiated");
+ }
+
+ public static org.opendaylight.mdsal.common.api.LogicalDatastoreType translateDataStoreType(final LogicalDatastoreType store) {
+ return store.equals(LogicalDatastoreType.CONFIGURATION) ?
+ org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION :
+ org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Read transaction that delegates calls to {@link org.opendaylight.mdsal.dom.broker.ShardedDOMReadTransactionAdapter},
+ * which in turn translates calls to shard aware implementation of {@link org.opendaylight.mdsal.dom.api.DOMDataTreeService}.
+ * <p>
+ * Since reading data distributed on different subshards is not guaranteed to
+ * return all relevant data, we cannot guarantee it neither. Best effort is to
+ * return all data we get from first initial data change event received.
+ */
+class ShardedDOMDataBrokerDelegatingReadTransaction implements DOMDataReadOnlyTransaction {
+ private final DOMDataTreeReadTransaction delegateTx;
+ private final Object txIdentifier;
+
+ public ShardedDOMDataBrokerDelegatingReadTransaction(final Object txIdentifier, final DOMDataTreeReadTransaction delegateTx) {
+ this.delegateTx = checkNotNull(delegateTx);
+ this.txIdentifier = checkNotNull(txIdentifier);
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ return Futures.makeChecked(delegateTx.read(LegacyShardedDOMDataBrokerAdapterUtils.translateDataStoreType(store), path), ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ return Futures.makeChecked(delegateTx.exists(LegacyShardedDOMDataBrokerAdapterUtils.translateDataStoreType(store), path), ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return txIdentifier;
+ }
+
+ @Override
+ public void close() {
+ delegateTx.close();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Map;
+import java.util.Queue;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Read/write transaction that delegates write and initial read to {@link org.opendaylight.mdsal.dom.broker.ShardedDOMWriteTransactionAdapter}
+ * and {@link org.opendaylight.mdsal.dom.broker.ShardedDOMReadTransactionAdapter}
+ * respectively. These two in turn rely on shard aware implementation of {@link org.opendaylight.mdsal.dom.api.DOMDataTreeService}.
+ * <p>
+ * Since reading data distributed on different subshards is not guaranteed to
+ * return all relevant data, best effort is to try to operate only on single
+ * subtree in conceptual data tree. We define this subtree by first write
+ * operation performed on transaction. All next read and write operations
+ * should be performed just in this initial subtree.
+ */
+// FIXME explicitly enforce just one subtree requirement
+@NotThreadSafe
+class ShardedDOMDataBrokerDelegatingReadWriteTransaction implements DOMDataReadWriteTransaction {
+ private static final ListenableFuture<RpcResult<TransactionStatus>> SUCCESS_FUTURE =
+ Futures.immediateFuture(RpcResultBuilder.success(TransactionStatus.COMMITED).build());
+
+ private final DOMDataReadOnlyTransaction readTxDelegate;
+ private final DOMDataWriteTransaction writeTxDelegate;
+ private final Object txIdentifier;
+ private final ImmutableMap<LogicalDatastoreType, Queue<Modification>> modificationHistoryMap;
+ private final ImmutableMap<LogicalDatastoreType, DataTreeSnapshot> snapshotMap;
+ private final Map<LogicalDatastoreType, ListenableFuture<Optional<NormalizedNode<?, ?>>>> initialReadMap;
+ private YangInstanceIdentifier root = null;
+
+ public ShardedDOMDataBrokerDelegatingReadWriteTransaction(final Object readWriteTxId, final SchemaContext ctx,
+ final DOMDataReadOnlyTransaction readTxDelegate,
+ final DOMDataWriteTransaction writeTxDelegate) {
+ this.readTxDelegate = checkNotNull(readTxDelegate);
+ this.writeTxDelegate = checkNotNull(writeTxDelegate);
+ this.txIdentifier = checkNotNull(readWriteTxId);
+ this.initialReadMap = Maps.newEnumMap(LogicalDatastoreType.class);
+
+ final InMemoryDataTreeFactory treeFactory = InMemoryDataTreeFactory.getInstance();
+ final ImmutableMap.Builder<LogicalDatastoreType, DataTreeSnapshot> snapshotMapBuilder = ImmutableMap.builder();
+ final ImmutableMap.Builder<LogicalDatastoreType, Queue<Modification>> modificationHistoryMapBuilder =
+ ImmutableMap.builder();
+ for (final LogicalDatastoreType store : LogicalDatastoreType.values()) {
+ final DataTree tree = treeFactory.create(treeTypeForStore(store));
+ tree.setSchemaContext(ctx);
+ snapshotMapBuilder.put(store, tree.takeSnapshot());
+
+ modificationHistoryMapBuilder.put(store, Lists.newLinkedList());
+ }
+
+ modificationHistoryMap = modificationHistoryMapBuilder.build();
+ snapshotMap = snapshotMapBuilder.build();
+ }
+
+ @Override
+ public boolean cancel() {
+ readTxDelegate.close();
+ return writeTxDelegate.cancel();
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ if (root == null) {
+ initialRead(path);
+ }
+
+ modificationHistoryMap.get(store).add(new Modification(Modification.Operation.DELETE, path, null));
+ writeTxDelegate.delete(store, path);
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ return writeTxDelegate.submit();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ return Futures.transform(submit(), (AsyncFunction<Void, RpcResult<TransactionStatus>>) input -> SUCCESS_FUTURE);
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ checkState(root != null, "A modify operation (put, merge or delete) must be performed prior to a read operation");
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> readResult = SettableFuture.create();
+ final Queue<Modification> currentHistory = Lists.newLinkedList(modificationHistoryMap.get(store));
+ Futures.addCallback(initialReadMap.get(store), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+ @Override
+ public void onSuccess(@Nullable final Optional<NormalizedNode<?, ?>> result) {
+ final DataTreeModification mod = snapshotMap.get(store).newModification();
+ if (result.isPresent()) {
+ mod.write(path, result.get());
+ }
+ applyModificationHistoryToSnapshot(mod, currentHistory);
+ readResult.set(mod.readNode(path));
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ readResult.setException(t);
+ }
+ });
+
+ return Futures.makeChecked(readResult, ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ checkState(root != null, "A modify operation (put, merge or delete) must be performed prior to an exists operation");
+ return Futures.makeChecked(Futures.transform(read(store, path),
+ (Function<Optional<NormalizedNode<?, ?>>, Boolean>) Optional::isPresent),
+ ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
+ if (root == null) {
+ initialRead(path);
+ }
+
+ modificationHistoryMap.get(store).add(new Modification(Modification.Operation.WRITE, path, data));
+ writeTxDelegate.put(store, path, data);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
+ if (root == null) {
+ initialRead(path);
+ }
+
+ modificationHistoryMap.get(store).add(new Modification(Modification.Operation.MERGE, path, data));
+ writeTxDelegate.merge(store, path, data);
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return txIdentifier;
+ }
+
+ private void initialRead(final YangInstanceIdentifier path) {
+ root = path;
+
+ final InMemoryDataTreeFactory treeFactory = InMemoryDataTreeFactory.getInstance();
+ for (final LogicalDatastoreType store : LogicalDatastoreType.values()) {
+ initialReadMap.put(store, readTxDelegate.read(store, path));
+ }
+ }
+
+ private TreeType treeTypeForStore(final LogicalDatastoreType store) {
+ return store == LogicalDatastoreType.CONFIGURATION ? TreeType.CONFIGURATION : TreeType.OPERATIONAL;
+ }
+
+ private void applyModificationHistoryToSnapshot(final DataTreeModification dataTreeModification,
+ final Queue<Modification> modificationHistory) {
+ while (!modificationHistory.isEmpty()) {
+ final Modification modification = modificationHistory.poll();
+ switch (modification.getOperation()) {
+ case WRITE:
+ dataTreeModification.write(modification.getPath(), modification.getData());
+ break;
+ case MERGE:
+ dataTreeModification.merge(modification.getPath(), modification.getData());
+ break;
+ case DELETE:
+ dataTreeModification.delete(modification.getPath());
+ break;
+ default:
+ // NOOP
+ }
+ }
+ }
+
+ static class Modification {
+
+ enum Operation {
+ WRITE, MERGE, DELETE
+ }
+
+ private final NormalizedNode<?, ?> data;
+ private final YangInstanceIdentifier path;
+ private final Operation operation;
+
+ Modification(final Operation operation, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ this.data = data;
+ this.path = checkNotNull(path);
+ this.operation = checkNotNull(operation);
+ }
+
+ Operation getOperation() {
+ return operation;
+ }
+
+ YangInstanceIdentifier getPath() {
+ return path;
+ }
+
+ NormalizedNode<?, ?> getData() {
+ return data;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.common.api.TransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+
+class ShardedDOMDataBrokerDelegatingTransactionChain implements DOMTransactionChain,
+ org.opendaylight.mdsal.common.api.TransactionChainListener {
+ private final org.opendaylight.mdsal.dom.api.DOMTransactionChain txChainDelegate;
+ private final SchemaContext schemaContext;
+ private final TransactionChainListener txChainListener;
+ private final Object txChainIdentifier;
+ private final AtomicLong txNum = new AtomicLong();
+
+ private final Map<Object, AsyncTransaction<?, ?>> transactionMap;
+
+ public ShardedDOMDataBrokerDelegatingTransactionChain(final Object txChainIdentifier, final SchemaContext schemaContext,
+ final org.opendaylight.mdsal.dom.api.DOMDataBroker brokerDelegate,
+ final TransactionChainListener txChainListener) {
+ checkNotNull(brokerDelegate);
+ this.schemaContext = checkNotNull(schemaContext);
+ this.txChainIdentifier = checkNotNull(txChainIdentifier);
+ this.txChainListener = checkNotNull(txChainListener);
+ this.txChainDelegate = brokerDelegate.createTransactionChain(this);
+ transactionMap = Maps.newHashMap();
+ }
+
+ @Override
+ public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ final DOMDataTreeReadTransaction readTxDelegate = txChainDelegate.newReadOnlyTransaction();
+ final DOMDataReadOnlyTransaction readTx =
+ new ShardedDOMDataBrokerDelegatingReadTransaction(newTransactionIdentifier(), readTxDelegate);
+ transactionMap.put(readTxDelegate.getIdentifier(), readTx);
+
+ return readTx;
+ }
+
+ @Override
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ final Object readWriteTxId = newTransactionIdentifier();
+ final DOMDataTreeReadTransaction readTxDelegate = txChainDelegate.newReadOnlyTransaction();
+ final DOMDataReadOnlyTransaction readTx =
+ new ShardedDOMDataBrokerDelegatingReadTransaction(readWriteTxId, readTxDelegate);
+
+ final DOMDataTreeWriteTransaction writeTxDelegate = txChainDelegate.newWriteOnlyTransaction();
+ final DOMDataWriteTransaction writeTx =
+ new ShardedDOMDataBrokerDelegatingWriteTransaction(readWriteTxId, writeTxDelegate);
+
+ final DOMDataReadWriteTransaction readWriteTx =
+ new ShardedDOMDataBrokerDelegatingReadWriteTransaction(readWriteTxId, schemaContext,
+ readTx, writeTx);
+ transactionMap.put(readTxDelegate.getIdentifier(), readWriteTx);
+ transactionMap.put(writeTxDelegate.getIdentifier(), readWriteTx);
+
+ return readWriteTx;
+ }
+
+ @Override
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ final DOMDataTreeWriteTransaction writeTxDelegate = txChainDelegate.newWriteOnlyTransaction();
+ final DOMDataWriteTransaction writeTx =
+ new ShardedDOMDataBrokerDelegatingWriteTransaction(newTransactionIdentifier(), writeTxDelegate);
+ transactionMap.put(writeTxDelegate.getIdentifier(), writeTx);
+
+ return writeTx;
+ }
+
+ @Override
+ public void close() {
+ txChainDelegate.close();
+ }
+
+ @Override
+ public void onTransactionChainFailed(final TransactionChain<?, ?> transactionChain,
+ final org.opendaylight.mdsal.common.api.AsyncTransaction<?, ?> asyncTransaction,
+ final Throwable throwable) {
+ txChainListener.onTransactionChainFailed(
+ this, transactionFromDelegate(asyncTransaction.getIdentifier()), throwable);
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(final TransactionChain<?, ?> transactionChain) {
+ txChainListener.onTransactionChainSuccessful(this);
+ }
+
+ private AsyncTransaction<?, ?> transactionFromDelegate(final Object delegateId) {
+ Preconditions.checkState(transactionMap.containsKey(delegateId),
+ "Delegate transaction {} is not present in transaction chain history", delegateId);
+ return transactionMap.get(delegateId);
+ }
+
+ private Object newTransactionIdentifier() {
+ return "DOM-CHAIN-" + txChainIdentifier + "-" + txNum.getAndIncrement();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+class ShardedDOMDataBrokerDelegatingWriteTransaction implements DOMDataWriteTransaction {
+ private static final ListenableFuture<RpcResult<TransactionStatus>> SUCCESS_FUTURE =
+ Futures.immediateFuture(RpcResultBuilder.success(TransactionStatus.COMMITED).build());
+
+ private final DOMDataTreeWriteTransaction delegateTx;
+ private final Object txIdentifier;
+
+ public ShardedDOMDataBrokerDelegatingWriteTransaction(final Object txIdentifier, final DOMDataTreeWriteTransaction delegateTx) {
+ this.delegateTx = checkNotNull(delegateTx);
+ this.txIdentifier = checkNotNull(txIdentifier);
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ delegateTx.put(LegacyShardedDOMDataBrokerAdapterUtils.translateDataStoreType(store), path, data);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ delegateTx.merge(LegacyShardedDOMDataBrokerAdapterUtils.translateDataStoreType(store), path, data);
+ }
+
+ @Override
+ public boolean cancel() {
+ return delegateTx.cancel();
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ delegateTx.delete(LegacyShardedDOMDataBrokerAdapterUtils.translateDataStoreType(store), path);
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ return Futures.makeChecked(delegateTx.submit(), TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ return Futures.transform(submit(), (AsyncFunction<Void, RpcResult<TransactionStatus>>) input -> SUCCESS_FUTURE);
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return txIdentifier;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+public class ShardedDOMDataBrokerDelegatingReadWriteTransactionTest {
+
+ @Mock
+ private DOMDataWriteTransaction writeTx;
+
+ @Mock
+ private DOMDataReadOnlyTransaction readTx;
+
+ private ShardedDOMDataBrokerDelegatingReadWriteTransaction rwTx;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ doNothing().when(writeTx).put(any(), any(), any());
+ doNothing().when(writeTx).merge(any(), any(), any());
+ doNothing().when(writeTx).delete(any(), any());
+ rwTx = new ShardedDOMDataBrokerDelegatingReadWriteTransaction("TEST-TX", TestModel.createTestContext(), readTx, writeTx);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testFirstReadShouldFail() {
+ rwTx.read(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH);
+ }
+
+ @Test
+ public void testGetIdentifier() {
+ assertEquals("TEST-TX", rwTx.getIdentifier());
+ }
+
+ @Test
+ public void testReadWriteOperations() throws Exception {
+ doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(readTx)
+ .read(any(), any());
+ rwTx.put(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH,
+ testNodeWithOuter(1, 2, 3));
+
+ verify(writeTx).put(eq(LogicalDatastoreType.OPERATIONAL), Matchers.eq(TestModel.TEST_PATH),
+ Matchers.eq(testNodeWithOuter(1, 2, 3)));
+ verify(readTx).read(eq(LogicalDatastoreType.OPERATIONAL), Matchers.eq(TestModel.TEST_PATH));
+
+ assertEquals(testNodeWithOuter(1, 2, 3),
+ rwTx.read(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH).checkedGet().get());
+
+ rwTx.merge(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH,
+ testNodeWithOuter(4, 5, 6));
+ assertEquals(testNodeWithOuter(1, 2, 3, 4, 5, 6),
+ rwTx.read(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH).checkedGet().get());
+
+ rwTx.delete(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH);
+
+ verify(writeTx).delete(eq(LogicalDatastoreType.OPERATIONAL), Matchers.eq(TestModel.TEST_PATH));
+ assertEquals(Optional.absent(),
+ rwTx.read(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH).checkedGet());
+ }
+
+ private DataContainerChild<?, ?> outerNode(int... ids) {
+ CollectionNodeBuilder<MapEntryNode, MapNode> outer = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME);
+ for(int id: ids) {
+ outer.addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, id));
+ }
+
+ return outer.build();
+ }
+
+ private NormalizedNode<?, ?> testNodeWithOuter(int... ids) {
+ return testNodeWithOuter(outerNode(ids));
+ }
+
+ private NormalizedNode<?, ?> testNodeWithOuter(DataContainerChild<?, ?> outer) {
+ return ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).withChild(outer).build();
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+
+public class ShardedDOMDataBrokerDelegatingTransactionChainTest {
+
+ @Mock
+ private DOMDataBroker dataBroker;
+
+ @Mock
+ private DOMTransactionChain delegateTxChain;
+
+ @Mock
+ private TransactionChainListener txChainlistener;
+
+ private ShardedDOMDataBrokerDelegatingTransactionChain txChain;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ doReturn(delegateTxChain).when(dataBroker).createTransactionChain(any());
+ txChain = new ShardedDOMDataBrokerDelegatingTransactionChain("1", TestModel.createTestContext(), dataBroker, txChainlistener);
+ }
+
+ @Test
+ public void testClose() {
+ doNothing().when(delegateTxChain).close();
+ txChain.close();
+ verify(delegateTxChain).close();
+ }
+
+ @Test
+ public void testNewWriteTransaction() {
+ DOMDataTreeWriteTransaction delegateWriteTx = mock(DOMDataTreeWriteTransaction.class);
+ doReturn(delegateWriteTx).when(delegateTxChain).newWriteOnlyTransaction();
+ doReturn("TEST-WRITE-TX-DELEGATE").when(delegateWriteTx).getIdentifier();
+ txChain.newWriteOnlyTransaction();
+ verify(delegateTxChain).newWriteOnlyTransaction();
+ }
+
+ @Test
+ public void testNewReadOnlyTransaction() {
+ DOMDataTreeReadTransaction delegateReadTx = mock(DOMDataTreeReadTransaction.class);
+ doReturn("TEST-READ-TX-DELEGATE").when(delegateReadTx).getIdentifier();
+ doReturn(delegateReadTx).when(delegateTxChain).newReadOnlyTransaction();
+ txChain.newReadOnlyTransaction();
+ verify(delegateTxChain).newReadOnlyTransaction();
+ }
+
+
+ @Test
+ public void testNewReadWriteTransaction() {
+ DOMDataTreeReadTransaction delegateReadTx = mock(DOMDataTreeReadTransaction.class);
+ doReturn("TEST-READ-TX-DELEGATE").when(delegateReadTx).getIdentifier();
+ doReturn(delegateReadTx).when(delegateTxChain).newReadOnlyTransaction();
+
+ DOMDataTreeWriteTransaction delegateWriteTx = mock(DOMDataTreeWriteTransaction.class);
+ doReturn(delegateWriteTx).when(delegateTxChain).newWriteOnlyTransaction();
+ doReturn("TEST-WRITE-TX-DELEGATE").when(delegateWriteTx).getIdentifier();
+
+ txChain.newReadWriteTransaction();
+ verify(delegateTxChain).newReadOnlyTransaction();
+ verify(delegateTxChain).newWriteOnlyTransaction();
+ }
+
+ @Test
+ public void testTransactionChainFailed() {
+ final DOMDataTreeWriteTransaction writeTxDelegate = mock(DOMDataTreeWriteTransaction.class);
+ doReturn("DELEGATE-WRITE-TX-1").when(writeTxDelegate).getIdentifier();
+ doReturn(writeTxDelegate).when(delegateTxChain).newWriteOnlyTransaction();
+ doNothing().when(txChainlistener).onTransactionChainFailed(any(), any(), any());
+
+ // verify writetx fail
+ txChain.newWriteOnlyTransaction();
+ txChain.onTransactionChainFailed(delegateTxChain, writeTxDelegate, new Throwable("Fail"));
+
+ final ArgumentCaptor<AsyncTransaction> txCaptor = ArgumentCaptor.forClass(AsyncTransaction.class);
+ final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
+ verify(txChainlistener)
+ .onTransactionChainFailed(eq(txChain), txCaptor.capture(), throwableCaptor.capture());
+ assertEquals("DOM-CHAIN-1-0", txCaptor.getValue().getIdentifier());
+ assertEquals("Fail", throwableCaptor.getValue().getMessage());
+
+ // verify readtx fail
+ final DOMDataTreeReadTransaction readTxDelegate = mock(DOMDataTreeReadTransaction.class);
+ doReturn("DELEGATE-READ-TX-1").when(readTxDelegate).getIdentifier();
+ doReturn(readTxDelegate).when(delegateTxChain).newReadOnlyTransaction();
+ doNothing().when(txChainlistener).onTransactionChainFailed(any(), any(), any());
+ txChain.newReadOnlyTransaction();
+ txChain.onTransactionChainFailed(delegateTxChain, readTxDelegate, new Throwable("Fail"));
+ verify(txChainlistener, times(2))
+ .onTransactionChainFailed(eq(txChain), txCaptor.capture(), throwableCaptor.capture());
+ assertEquals("DOM-CHAIN-1-1", txCaptor.getValue().getIdentifier());
+ assertEquals("Fail", throwableCaptor.getValue().getMessage());
+
+
+ // verify readwritetx fail, we must check both read and write failure
+ // translates to returned readwritetx
+
+ // we can reuse write and read tx delegates, just return different
+ // identifiers to avoid conflicts in keys in tx dictionary
+ doReturn("DELEGATE-WRITE-RWTX-1").when(writeTxDelegate).getIdentifier();
+ doReturn("DELEGATE-READ-RWTX-1").when(readTxDelegate).getIdentifier();
+ txChain.newReadWriteTransaction();
+ txChain.onTransactionChainFailed(delegateTxChain, writeTxDelegate, new Throwable("Fail"));
+ verify(txChainlistener, times(3))
+ .onTransactionChainFailed(eq(txChain), txCaptor.capture(), throwableCaptor.capture());
+ assertEquals("DOM-CHAIN-1-2", txCaptor.getValue().getIdentifier());
+ assertEquals("Fail", throwableCaptor.getValue().getMessage());
+
+ txChain.onTransactionChainFailed(delegateTxChain, readTxDelegate, new Throwable("Fail"));
+ verify(txChainlistener, times(4))
+ .onTransactionChainFailed(eq(txChain), txCaptor.capture(), throwableCaptor.capture());
+ assertEquals("DOM-CHAIN-1-2", txCaptor.getValue().getIdentifier());
+ assertEquals("Fail", throwableCaptor.getValue().getMessage());
+ }
+
+ @Test
+ public void testTransactionChainSuccessful() {
+ doNothing().when(txChainlistener).onTransactionChainSuccessful(any());
+ txChain.onTransactionChainSuccessful(delegateTxChain);
+ verify(txChainlistener).onTransactionChainSuccessful(eq(txChain));
+ }
+}
\ No newline at end of file