--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.mdsal.common.api.TransactionChainListener;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+
+public class ShardedDOMDataBrokerAdapter implements DOMDataBroker {
+
+ private final DOMDataTreeService service;
+ private final AtomicLong txNum = new AtomicLong();
+ private final AtomicLong chainNum = new AtomicLong();
+
+ public ShardedDOMDataBrokerAdapter(final DOMDataTreeService service) {
+ this.service = service;
+ }
+
+ @Override
+ public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public DOMDataTreeReadTransaction newReadOnlyTransaction() {
+ return new ShardedDOMReadTransactionAdapter(newTransactionIdentifier(), service);
+ }
+
+ @Override
+ public DOMDataTreeWriteTransaction newWriteOnlyTransaction() {
+ return new ShardedDOMWriteTransactionAdapter(newTransactionIdentifier(), service);
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ return new ShardedDOMTransactionChainAdapter(newChainIdentifier(), service, listener);
+ }
+
+ private Object newTransactionIdentifier() {
+ return "DOM-" + txNum.getAndIncrement();
+ }
+
+ private Object newChainIdentifier() {
+ return "DOM-CHAIN-" + chainNum;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShardedDOMReadTransactionAdapter implements DOMDataTreeReadTransaction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMReadTransactionAdapter.class.getName());
+
+ private final List<ListenerRegistration<DOMDataTreeListener>> registrations = Lists.newArrayList();
+ private final DOMDataTreeService service;
+ private final Object txIdentifier;
+
+ private boolean finished = false;
+
+ ShardedDOMReadTransactionAdapter(final Object identifier, final DOMDataTreeService service) {
+ this.service = Preconditions.checkNotNull(service);
+ this.txIdentifier = Preconditions.checkNotNull(identifier);
+ }
+
+ @Override
+ public void close() {
+ // TODO should we also cancel all read futures?
+ LOG.debug("{}: Closing read transaction", txIdentifier);
+ if (finished == true) {
+ return;
+ }
+
+ registrations.forEach(ListenerRegistration::close);
+ finished = true;
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return txIdentifier;
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ checkRunning();
+ LOG.debug("{}: Invoking read at {}:{}", txIdentifier, store, path);
+ final ListenerRegistration<DOMDataTreeListener> reg;
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> initialDataTreeChangeFuture = SettableFuture.create();
+ try {
+ reg = service.registerListener(new ReadShardedListener(initialDataTreeChangeFuture),
+ Collections.singleton(new DOMDataTreeIdentifier(store, path)), false, Collections.emptyList());
+ registrations.add(reg);
+ } catch (final DOMDataTreeLoopException e) {
+ // This should not happen, we are not specifying any
+ // producers when registering listener
+ throw new IllegalStateException("Loop in listener and producers detected", e);
+ }
+
+ // After data tree change future is finished, we can close the listener registration
+ Futures.addCallback(initialDataTreeChangeFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+ @Override
+ public void onSuccess(@Nullable final Optional<NormalizedNode<?, ?>> result) {
+ reg.close();
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ reg.close();
+ }
+ });
+
+ return Futures.makeChecked(initialDataTreeChangeFuture, ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ checkRunning();
+ LOG.debug("{}: Invoking exists at {}:{}", txIdentifier, store, path);
+ final Function<Optional<NormalizedNode<?, ?>>, Boolean> transform =
+ optionalNode -> optionalNode.isPresent() ? Boolean.TRUE : Boolean.FALSE;
+ final ListenableFuture<Boolean> existsResult = Futures.transform(read(store, path), transform);
+ return Futures.makeChecked(existsResult, ReadFailedException.MAPPER);
+ }
+
+ private void checkRunning() {
+ Preconditions.checkState(finished == false, "Transaction is already closed");
+ }
+
+ static class ReadShardedListener implements DOMDataTreeListener {
+
+ private final SettableFuture<Optional<NormalizedNode<?, ?>>> readResultFuture;
+
+ public ReadShardedListener(final SettableFuture<Optional<NormalizedNode<?, ?>>> future) {
+ this.readResultFuture = Preconditions.checkNotNull(future);
+ }
+
+ @Override
+ public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
+ final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
+ Preconditions.checkState(changes.size() == 1 && subtrees.size() == 1,
+ "DOMDataTreeListener registered exactly on one subtree");
+
+ for (final DataTreeCandidate change : changes) {
+ if (change.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) {
+ readResultFuture.set(Optional.absent());
+ return;
+ }
+ }
+
+ for (final NormalizedNode initialState : subtrees.values()) {
+ readResultFuture.set(Optional.of(initialState));
+ }
+ }
+
+ @Override
+ public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
+ // TODO If we get just one exception, we don't need to do
+ // chaining
+
+ // We chain all exceptions and return aggregated one
+ readResultFuture.setException(new DOMDataTreeListeningException("Aggregated DOMDataTreeListening exception",
+ causes.stream().reduce((e1, e2) ->
+ {
+ e1.addSuppressed(e2);
+ return e1;
+ }).get()));
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.AsyncTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionChainListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+public class ShardedDOMTransactionChainAdapter implements DOMTransactionChain {
+
+ private final DOMDataTreeService dataTreeService;
+ private final Object txChainIdentifier;
+ private final AtomicLong txNum = new AtomicLong();
+ private final TransactionChainListener txChainListener;
+ private final CachedDataTreeService cachedDataTreeService;
+ private TransactionChainWriteTransaction writeTx;
+ private TransactionChainReadTransaction readTx;
+ private ListenableFuture<Void> writeTxSubmitFuture;
+ private boolean finished = false;
+
+ public ShardedDOMTransactionChainAdapter(final Object txChainIdentifier,
+ final DOMDataTreeService dataTreeService,
+ final TransactionChainListener txChainListener) {
+ Preconditions.checkNotNull(dataTreeService);
+ Preconditions.checkNotNull(txChainIdentifier);
+ this.dataTreeService = dataTreeService;
+ this.txChainIdentifier = txChainIdentifier;
+ this.txChainListener = txChainListener;
+ this.cachedDataTreeService = new CachedDataTreeService(dataTreeService);
+ }
+
+ @Override
+ public DOMDataTreeReadTransaction newReadOnlyTransaction() {
+ checkRunning();
+ checkReadTxClosed();
+ checkWriteTxClosed();
+ readTx = new TransactionChainReadTransaction(newTransactionIdentifier(),
+ new ShardedDOMReadTransactionAdapter(newTransactionIdentifier(), dataTreeService),
+ writeTxSubmitFuture, this);
+
+ return readTx;
+ }
+
+ @Override
+ public DOMDataTreeWriteTransaction newWriteOnlyTransaction() {
+ checkRunning();
+ checkWriteTxClosed();
+ checkReadTxClosed();
+ writeTx = new TransactionChainWriteTransaction(newTransactionIdentifier(),
+ new ShardedDOMWriteTransactionAdapter(newTransactionIdentifier(),
+ cachedDataTreeService), this);
+
+ return writeTx;
+ }
+
+ @Override
+ public void close() {
+ if (finished = true) {
+ // already closed, do nothing
+ return;
+ }
+
+ checkReadTxClosed();
+ checkWriteTxClosed();
+ Futures.addCallback(writeTxSubmitFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable final Void result) {
+ txChainListener.onTransactionChainSuccessful(ShardedDOMTransactionChainAdapter.this);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ // We don't have to do nothing here,
+ // tx should take car of it
+ }
+ });
+
+ cachedDataTreeService.closeProducers();
+ finished = true;
+ }
+
+ public void closeReadTransaction() {
+ readTx = null;
+ }
+
+ public void closeWriteTransaction(final ListenableFuture<Void> submitFuture) {
+ writeTxSubmitFuture = submitFuture;
+ writeTx = null;
+ }
+
+ private Object newTransactionIdentifier() {
+ return "DOM-CHAIN-" + txChainIdentifier + "-" + txNum.getAndIncrement();
+ }
+
+ private void checkWriteTxClosed() {
+ Preconditions.checkState(writeTx == null);
+ }
+
+ private void checkReadTxClosed() {
+ Preconditions.checkState(readTx == null);
+ }
+
+ private void checkRunning() {
+ Preconditions.checkState(finished == false);
+ }
+
+ public void transactionFailed(final AsyncTransaction<?, ?> tx, final Throwable cause) {
+ txChainListener.onTransactionChainFailed(this, tx, cause);
+ if (writeTx != null) {
+ writeTx.cancel();
+ }
+ if (readTx != null) {
+ readTx.close();
+ }
+ cachedDataTreeService.closeProducers();
+ finished = true;
+ }
+
+ static class CachedDataTreeService implements DOMDataTreeService {
+
+ private final DOMDataTreeService delegateTreeService;
+ private final Map<LogicalDatastoreType, NoopCloseDataProducer> producersMap =
+ new EnumMap<>(LogicalDatastoreType.class);
+
+ CachedDataTreeService(final DOMDataTreeService delegateTreeService) {
+ this.delegateTreeService = delegateTreeService;
+ }
+
+ void closeProducers() {
+ producersMap.values().forEach(NoopCloseDataProducer::closeDelegate);
+ }
+
+ @Nonnull
+ @Override
+ public <T extends DOMDataTreeListener> ListenerRegistration<T>
+ registerListener(@Nonnull final T listener, @Nonnull final Collection<DOMDataTreeIdentifier> subtrees,
+ final boolean allowRxMerges, @Nonnull final Collection<DOMDataTreeProducer> producers)
+ throws DOMDataTreeLoopException {
+ return delegateTreeService.registerListener(listener, subtrees, allowRxMerges, producers);
+ }
+
+ @Override
+ public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+ Preconditions.checkState(subtrees.size() == 1);
+ NoopCloseDataProducer producer = null;
+ for (final DOMDataTreeIdentifier treeId : subtrees) {
+ producer =
+ new NoopCloseDataProducer(delegateTreeService.createProducer(Collections.singleton(treeId)));
+ producersMap.putIfAbsent(treeId.getDatastoreType(),
+ producer);
+ }
+ return producer;
+ }
+
+ static class NoopCloseDataProducer implements DOMDataTreeProducer {
+
+ private final DOMDataTreeProducer delegateTreeProducer;
+
+ NoopCloseDataProducer(final DOMDataTreeProducer delegateTreeProducer) {
+ this.delegateTreeProducer = delegateTreeProducer;
+ }
+
+ @Nonnull
+ @Override
+ public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
+ return delegateTreeProducer.createTransaction(isolated);
+ }
+
+ @Nonnull
+ @Override
+ public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+ return delegateTreeProducer.createProducer(subtrees);
+ }
+
+ @Override
+ public void close() throws DOMDataTreeProducerException {
+ // noop
+ }
+
+ public void closeDelegate() {
+ try {
+ delegateTreeProducer.close();
+ } catch (final DOMDataTreeProducerException e) {
+ throw new IllegalStateException("Trying to close DOMDataTreeProducer with open transaction", e);
+ }
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShardedDOMWriteTransactionAdapter implements DOMDataTreeWriteTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMWriteTransactionAdapter.class);
+
+ private final Map<LogicalDatastoreType, DOMDataTreeProducer> producerMap;
+ private final Map<LogicalDatastoreType, DOMDataTreeCursorAwareTransaction> transactionMap;
+ private final Map<LogicalDatastoreType, DOMDataTreeWriteCursor> cursorMap;
+
+ private final DOMDataTreeService treeService;
+ private final Object txIdentifier;
+ private boolean finished = false;
+ private boolean initialized = false;
+
+ ShardedDOMWriteTransactionAdapter(final Object identifier, final DOMDataTreeService transactionDelegator) {
+ this.treeService = Preconditions.checkNotNull(transactionDelegator);
+ this.txIdentifier = Preconditions.checkNotNull(identifier);
+ this.producerMap = new EnumMap<>(LogicalDatastoreType.class);
+ this.transactionMap = new EnumMap<>(LogicalDatastoreType.class);
+ this.cursorMap = new EnumMap<>(LogicalDatastoreType.class);
+ }
+
+ @Override
+ public boolean cancel() {
+ LOG.debug("{}: Cancelling transaction");
+ if (finished == true) {
+ return false;
+ }
+
+ // We close cursor, cancel transactions and close producers and
+ // mark transaction as finished
+ cursorMap.values().forEach(DOMDataTreeWriteCursor::close);
+ transactionMap.values().forEach(domDataTreeCursorAwareTransaction ->
+ Preconditions.checkState(domDataTreeCursorAwareTransaction.cancel()));
+ closeProducers();
+ finished = true;
+ return true;
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ checkRunning();
+ LOG.debug("{}: Submitting transaction", txIdentifier);
+ if (initialized == false) {
+ // If underlying producers, transactions and cursors are
+ // not even initialized just seal this transaction and
+ // return immediate future
+ finished = true;
+ return Futures.immediateCheckedFuture(null);
+ }
+ // First we need to close cursors
+ cursorMap.values().forEach(DOMDataTreeWriteCursor::close);
+ final ListenableFuture<List<Void>> aggregatedSubmit = Futures.allAsList(
+ transactionMap.get(LogicalDatastoreType.CONFIGURATION).submit(),
+ transactionMap.get(LogicalDatastoreType.OPERATIONAL).submit());
+
+ // Now we can close producers and mark transaction as finished
+ closeProducers();
+ finished = true;
+
+ return Futures.makeChecked(
+ Futures.transform(aggregatedSubmit, (List<Void> input) -> input.get(0)),
+ TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return txIdentifier;
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
+ checkRunning();
+ LOG.debug("{}: Invoking put operation at {}:{} with payload {}", txIdentifier, store, path);
+ if (initialized == false) {
+ initializeDataTreeProducerLayer(path.getParent());
+ }
+
+ final DOMDataTreeWriteCursor cursor = cursorMap.get(store);
+ cursor.write(path.getLastPathArgument(), data);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
+ checkRunning();
+ LOG.debug("{}: Invoking merge operation at {}:{} with payload {}", txIdentifier, store, path);
+ if (initialized == false) {
+ initializeDataTreeProducerLayer(path.getParent());
+ }
+
+ final DOMDataTreeWriteCursor cursor = cursorMap.get(store);
+ cursor.merge(path.getLastPathArgument(), data);
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ checkRunning();
+ LOG.debug("{}: Invoking delete operation at {}:{}", txIdentifier, store, path);
+ if (initialized == false) {
+ initializeDataTreeProducerLayer(path.getParent());
+ }
+
+ final DOMDataTreeWriteCursor cursor = cursorMap.get(store);
+ cursor.delete(path.getLastPathArgument());
+ }
+
+ // TODO initialize producer, transaction and cursor for only
+ // for necessary data store at one time
+ private void initializeDataTreeProducerLayer(final YangInstanceIdentifier path) {
+ Preconditions.checkState(producerMap.isEmpty(), "Producers already initialized");
+ Preconditions.checkState(transactionMap.isEmpty(), "Transactions already initialized");
+ Preconditions.checkState(cursorMap.isEmpty(), "Cursors already initialized");
+
+ LOG.debug("{}: Creating data tree producers on path {}", txIdentifier, path);
+ producerMap.put(LogicalDatastoreType.CONFIGURATION,
+ treeService.createProducer(
+ Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path))));
+ producerMap.put(LogicalDatastoreType.OPERATIONAL,
+ treeService.createProducer(
+ Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, path))));
+
+ LOG.debug("{}: Creating DOMDataTreeCursorAwareTransactions delegates", txIdentifier, path);
+ transactionMap.put(LogicalDatastoreType.CONFIGURATION,
+ producerMap.get(LogicalDatastoreType.CONFIGURATION).createTransaction(true));
+ transactionMap.put(LogicalDatastoreType.OPERATIONAL,
+ producerMap.get(LogicalDatastoreType.OPERATIONAL).createTransaction(true));
+
+ LOG.debug("{}: Creating DOMDataTreeWriteCursors delegates");
+ cursorMap.put(LogicalDatastoreType.CONFIGURATION,
+ transactionMap.get(LogicalDatastoreType.CONFIGURATION)
+ .createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path)));
+ cursorMap.put(LogicalDatastoreType.OPERATIONAL,
+ transactionMap.get(LogicalDatastoreType.OPERATIONAL)
+ .createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, path)));
+
+ initialized = true;
+ }
+
+ private void checkRunning() {
+ Preconditions.checkState(finished == false, "{}: Transaction already finished");
+ }
+
+ private void closeProducers() {
+ producerMap.values().forEach(domDataTreeProducer ->
+ {
+ try {
+ domDataTreeProducer.close();
+ } catch (final DOMDataTreeProducerException e) {
+ throw new IllegalStateException("Trying to close DOMDataTreeProducer with open transaction", e);
+ }
+ });
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class TransactionChainReadTransaction implements DOMDataTreeReadTransaction {
+
+ private final DOMDataTreeReadTransaction delegateReadTx;
+ private final ListenableFuture<Void> previousWriteTxFuture;
+ private final Object identifier;
+ private final ShardedDOMTransactionChainAdapter txChain;
+
+ TransactionChainReadTransaction(final Object txIdentifier, final DOMDataTreeReadTransaction delegateReadTx,
+ final ListenableFuture<Void> previousWriteTxFuture,
+ final ShardedDOMTransactionChainAdapter txChain) {
+ this.delegateReadTx = delegateReadTx;
+ this.previousWriteTxFuture = previousWriteTxFuture;
+ this.identifier = txIdentifier;
+ this.txChain = txChain;
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> readResult = SettableFuture.create();
+
+ Futures.addCallback(previousWriteTxFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable final Void result) {
+ Futures.addCallback(delegateReadTx.read(store, path),
+ new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+ @Override
+ public void onSuccess(@Nullable final Optional<NormalizedNode<?, ?>> result) {
+ readResult.set(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ txChain.transactionFailed(TransactionChainReadTransaction.this, t);
+ readResult.setException(t);
+ }
+ });
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ // we don't have to notify txchain about this failure
+ // failed write transaction should do this
+ readResult.setException(t);
+ }
+ });
+
+ return Futures.makeChecked(readResult, ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ final Function<Optional<NormalizedNode<?, ?>>, Boolean> transform =
+ optionalNode -> optionalNode.isPresent() ? Boolean.TRUE : Boolean.FALSE;
+ final ListenableFuture<Boolean> existsResult = Futures.transform(read(store, path), transform);
+ return Futures.makeChecked(existsResult, ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public void close() {
+ delegateReadTx.close();
+ txChain.closeReadTransaction();
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return identifier;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class TransactionChainWriteTransaction implements DOMDataTreeWriteTransaction {
+ private final DOMDataTreeWriteTransaction delegateTx;
+ private final Object identifier;
+ private final ShardedDOMTransactionChainAdapter txChain;
+
+ public TransactionChainWriteTransaction(final Object identifier, final DOMDataTreeWriteTransaction delegateTx,
+ final ShardedDOMTransactionChainAdapter txChain) {
+ this.delegateTx = Preconditions.checkNotNull(delegateTx);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.txChain = Preconditions.checkNotNull(txChain);
+ }
+
+
+ @Override
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ delegateTx.put(store, path, data);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ delegateTx.merge(store, path, data);
+ }
+
+ @Override
+ public boolean cancel() {
+ txChain.closeWriteTransaction(Futures.immediateFuture(null));
+ return delegateTx.cancel();
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ delegateTx.delete(store, path);
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ final CheckedFuture<Void, TransactionCommitFailedException> writeResultFuture = delegateTx.submit();
+ Futures.addCallback(writeResultFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable final Void result) {
+ // NOOP
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ txChain.transactionFailed(TransactionChainWriteTransaction.this, t);
+ }
+ });
+
+ txChain.closeWriteTransaction(writeResultFuture);
+ return writeResultFuture;
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return identifier;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.broker.util.TestModel;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+
+public class ShardedDOMReadTransactionAdapterTest {
+
+ private ShardedDOMReadTransactionAdapter readTx;
+
+ @Before
+ public void setUp() {
+ readTx = new ShardedDOMReadTransactionAdapter("TEST-TX", new TestTreeService());
+ }
+
+ @Test
+ public void testGetIdentifier() {
+ assertEquals("TEST-TX", readTx.getIdentifier());
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readResult =
+ readTx.read(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+ assertEquals(readResult.checkedGet().get(), TestUtils.TEST_CONTAINER);
+ }
+
+
+ private static class TestTreeService implements DOMDataTreeService {
+
+ @Nonnull
+ @Override
+ public <T extends DOMDataTreeListener> ListenerRegistration<T>
+ registerListener(@Nonnull final T listener, @Nonnull final Collection<DOMDataTreeIdentifier> subtrees,
+ final boolean allowRxMerges,
+ @Nonnull final Collection<DOMDataTreeProducer> producers) throws DOMDataTreeLoopException {
+ final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtree = Maps.newHashMap();
+ subtree.put(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH),
+ TestUtils.TEST_CONTAINER);
+
+ listener.onDataTreeChanged(Collections.singleton(
+ DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, TestUtils.TEST_CONTAINER)), subtree);
+
+ return new ListenerRegistration<T>() {
+ @Override
+ public void close() {
+ // NOOP
+ }
+
+ @Override
+ public T getInstance() {
+ return listener;
+ }
+ };
+ }
+
+ @Nonnull
+ @Override
+ public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+ return null;
+ }
+ }
+}
\ No newline at end of file