Bug 5699 - Migrate existing code to use new sharding apis 87/42887/5
authorJakub Morvay <jmorvay@cisco.com>
Mon, 1 Aug 2016 07:36:39 +0000 (09:36 +0200)
committerJakub Morvay <jmorvay@cisco.com>
Tue, 2 Aug 2016 14:38:26 +0000 (14:38 +0000)
This adds adapters that translates DOMDataBroker API calls to cursor aware
API calls

Change-Id: Iab0a94af598f355768f6ef48ac8cb358960418b1
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataBrokerAdapter.java [new file with mode: 0644]
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapter.java [new file with mode: 0644]
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMTransactionChainAdapter.java [new file with mode: 0644]
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMWriteTransactionAdapter.java [new file with mode: 0644]
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainReadTransaction.java [new file with mode: 0644]
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainWriteTransaction.java [new file with mode: 0644]
dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapterTest.java [new file with mode: 0644]

diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataBrokerAdapter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataBrokerAdapter.java
new file mode 100644 (file)
index 0000000..7faad87
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.mdsal.common.api.TransactionChainListener;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+
+public class ShardedDOMDataBrokerAdapter implements DOMDataBroker {
+
+    private final DOMDataTreeService service;
+    private final AtomicLong txNum = new AtomicLong();
+    private final AtomicLong chainNum = new AtomicLong();
+
+    public ShardedDOMDataBrokerAdapter(final DOMDataTreeService service) {
+        this.service = service;
+    }
+
+    @Override
+    public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public DOMDataTreeReadTransaction newReadOnlyTransaction() {
+        return new ShardedDOMReadTransactionAdapter(newTransactionIdentifier(), service);
+    }
+
+    @Override
+    public DOMDataTreeWriteTransaction newWriteOnlyTransaction() {
+        return new ShardedDOMWriteTransactionAdapter(newTransactionIdentifier(), service);
+    }
+
+    @Override
+    public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+        return new ShardedDOMTransactionChainAdapter(newChainIdentifier(), service, listener);
+    }
+
+    private Object newTransactionIdentifier() {
+        return "DOM-" + txNum.getAndIncrement();
+    }
+
+    private Object newChainIdentifier() {
+        return "DOM-CHAIN-" + chainNum;
+    }
+
+}
diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapter.java
new file mode 100644 (file)
index 0000000..c0f7ae1
--- /dev/null
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShardedDOMReadTransactionAdapter implements DOMDataTreeReadTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMReadTransactionAdapter.class.getName());
+
+    private final List<ListenerRegistration<DOMDataTreeListener>> registrations = Lists.newArrayList();
+    private final DOMDataTreeService service;
+    private final Object txIdentifier;
+
+    private boolean finished = false;
+
+    ShardedDOMReadTransactionAdapter(final Object identifier, final DOMDataTreeService service) {
+        this.service = Preconditions.checkNotNull(service);
+        this.txIdentifier = Preconditions.checkNotNull(identifier);
+    }
+
+    @Override
+    public void close() {
+        // TODO should we also cancel all read futures?
+        LOG.debug("{}: Closing read transaction", txIdentifier);
+        if (finished == true) {
+            return;
+        }
+
+        registrations.forEach(ListenerRegistration::close);
+        finished = true;
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return txIdentifier;
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+            final YangInstanceIdentifier path) {
+        checkRunning();
+        LOG.debug("{}: Invoking read at {}:{}", txIdentifier, store, path);
+        final ListenerRegistration<DOMDataTreeListener> reg;
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> initialDataTreeChangeFuture = SettableFuture.create();
+        try {
+            reg = service.registerListener(new ReadShardedListener(initialDataTreeChangeFuture),
+                    Collections.singleton(new DOMDataTreeIdentifier(store, path)), false, Collections.emptyList());
+            registrations.add(reg);
+        } catch (final DOMDataTreeLoopException e) {
+            // This should not happen, we are not specifying any
+            // producers when registering listener
+            throw new IllegalStateException("Loop in listener and producers detected", e);
+        }
+
+        // After data tree change future is finished, we can close the listener registration
+        Futures.addCallback(initialDataTreeChangeFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+            @Override
+            public void onSuccess(@Nullable final Optional<NormalizedNode<?, ?>> result) {
+                reg.close();
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                reg.close();
+            }
+        });
+
+        return Futures.makeChecked(initialDataTreeChangeFuture, ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+            final YangInstanceIdentifier path) {
+        checkRunning();
+        LOG.debug("{}: Invoking exists at {}:{}", txIdentifier, store, path);
+        final Function<Optional<NormalizedNode<?, ?>>, Boolean> transform =
+                optionalNode -> optionalNode.isPresent() ? Boolean.TRUE : Boolean.FALSE;
+        final ListenableFuture<Boolean> existsResult = Futures.transform(read(store, path), transform);
+        return Futures.makeChecked(existsResult, ReadFailedException.MAPPER);
+    }
+
+    private void checkRunning() {
+        Preconditions.checkState(finished == false, "Transaction is already closed");
+    }
+
+    static class ReadShardedListener implements DOMDataTreeListener {
+
+        private final SettableFuture<Optional<NormalizedNode<?, ?>>> readResultFuture;
+
+        public ReadShardedListener(final SettableFuture<Optional<NormalizedNode<?, ?>>> future) {
+            this.readResultFuture = Preconditions.checkNotNull(future);
+        }
+
+        @Override
+        public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
+                final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
+            Preconditions.checkState(changes.size() == 1 && subtrees.size() == 1,
+                    "DOMDataTreeListener registered exactly on one subtree");
+
+            for (final DataTreeCandidate change : changes) {
+                if (change.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) {
+                    readResultFuture.set(Optional.absent());
+                    return;
+                }
+            }
+
+            for (final NormalizedNode initialState : subtrees.values()) {
+                readResultFuture.set(Optional.of(initialState));
+            }
+        }
+
+        @Override
+        public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
+            // TODO If we get just one exception, we don't need to do
+            // chaining
+
+            // We chain all exceptions and return aggregated one
+            readResultFuture.setException(new DOMDataTreeListeningException("Aggregated DOMDataTreeListening exception",
+                    causes.stream().reduce((e1, e2) ->
+                    {
+                        e1.addSuppressed(e2);
+                        return e1;
+                    }).get()));
+        }
+    }
+}
diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMTransactionChainAdapter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMTransactionChainAdapter.java
new file mode 100644 (file)
index 0000000..e6fa8bc
--- /dev/null
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.AsyncTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionChainListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+public class ShardedDOMTransactionChainAdapter implements DOMTransactionChain {
+
+    private final DOMDataTreeService dataTreeService;
+    private final Object txChainIdentifier;
+    private final AtomicLong txNum = new AtomicLong();
+    private final TransactionChainListener txChainListener;
+    private final CachedDataTreeService cachedDataTreeService;
+    private TransactionChainWriteTransaction writeTx;
+    private TransactionChainReadTransaction readTx;
+    private ListenableFuture<Void> writeTxSubmitFuture;
+    private boolean finished = false;
+
+    public ShardedDOMTransactionChainAdapter(final Object txChainIdentifier,
+                                             final DOMDataTreeService dataTreeService,
+                                             final TransactionChainListener txChainListener) {
+        Preconditions.checkNotNull(dataTreeService);
+        Preconditions.checkNotNull(txChainIdentifier);
+        this.dataTreeService = dataTreeService;
+        this.txChainIdentifier = txChainIdentifier;
+        this.txChainListener = txChainListener;
+        this.cachedDataTreeService = new CachedDataTreeService(dataTreeService);
+    }
+
+    @Override
+    public DOMDataTreeReadTransaction newReadOnlyTransaction() {
+        checkRunning();
+        checkReadTxClosed();
+        checkWriteTxClosed();
+        readTx = new TransactionChainReadTransaction(newTransactionIdentifier(),
+                new ShardedDOMReadTransactionAdapter(newTransactionIdentifier(), dataTreeService),
+                writeTxSubmitFuture, this);
+
+        return readTx;
+    }
+
+    @Override
+    public DOMDataTreeWriteTransaction newWriteOnlyTransaction() {
+        checkRunning();
+        checkWriteTxClosed();
+        checkReadTxClosed();
+        writeTx = new TransactionChainWriteTransaction(newTransactionIdentifier(),
+                new ShardedDOMWriteTransactionAdapter(newTransactionIdentifier(),
+                        cachedDataTreeService), this);
+
+        return writeTx;
+    }
+
+    @Override
+    public void close() {
+        if (finished = true) {
+            // already closed, do nothing
+            return;
+        }
+
+        checkReadTxClosed();
+        checkWriteTxClosed();
+        Futures.addCallback(writeTxSubmitFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(@Nullable final Void result) {
+                txChainListener.onTransactionChainSuccessful(ShardedDOMTransactionChainAdapter.this);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                // We don't have to do nothing here,
+                // tx should take car of it
+            }
+        });
+
+        cachedDataTreeService.closeProducers();
+        finished = true;
+    }
+
+    public void closeReadTransaction() {
+        readTx = null;
+    }
+
+    public void closeWriteTransaction(final ListenableFuture<Void> submitFuture) {
+        writeTxSubmitFuture = submitFuture;
+        writeTx = null;
+    }
+
+    private Object newTransactionIdentifier() {
+        return "DOM-CHAIN-" + txChainIdentifier + "-" + txNum.getAndIncrement();
+    }
+
+    private void checkWriteTxClosed() {
+        Preconditions.checkState(writeTx == null);
+    }
+
+    private void checkReadTxClosed() {
+        Preconditions.checkState(readTx == null);
+    }
+
+    private void checkRunning() {
+        Preconditions.checkState(finished == false);
+    }
+
+    public void transactionFailed(final AsyncTransaction<?, ?> tx, final Throwable cause) {
+        txChainListener.onTransactionChainFailed(this, tx, cause);
+        if (writeTx != null) {
+            writeTx.cancel();
+        }
+        if (readTx != null) {
+            readTx.close();
+        }
+        cachedDataTreeService.closeProducers();
+        finished = true;
+    }
+
+    static class CachedDataTreeService implements DOMDataTreeService {
+
+        private final DOMDataTreeService delegateTreeService;
+        private final Map<LogicalDatastoreType, NoopCloseDataProducer> producersMap =
+                new EnumMap<>(LogicalDatastoreType.class);
+
+        CachedDataTreeService(final DOMDataTreeService delegateTreeService) {
+            this.delegateTreeService = delegateTreeService;
+        }
+
+        void closeProducers() {
+            producersMap.values().forEach(NoopCloseDataProducer::closeDelegate);
+        }
+
+        @Nonnull
+        @Override
+        public <T extends DOMDataTreeListener> ListenerRegistration<T>
+        registerListener(@Nonnull final T listener, @Nonnull final Collection<DOMDataTreeIdentifier> subtrees,
+                         final boolean allowRxMerges, @Nonnull final Collection<DOMDataTreeProducer> producers)
+                throws DOMDataTreeLoopException {
+            return delegateTreeService.registerListener(listener, subtrees, allowRxMerges, producers);
+        }
+
+        @Override
+        public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+            Preconditions.checkState(subtrees.size() == 1);
+            NoopCloseDataProducer producer = null;
+            for (final DOMDataTreeIdentifier treeId : subtrees) {
+                producer =
+                        new NoopCloseDataProducer(delegateTreeService.createProducer(Collections.singleton(treeId)));
+                producersMap.putIfAbsent(treeId.getDatastoreType(),
+                        producer);
+            }
+            return producer;
+        }
+
+        static class NoopCloseDataProducer implements DOMDataTreeProducer {
+
+            private final DOMDataTreeProducer delegateTreeProducer;
+
+            NoopCloseDataProducer(final DOMDataTreeProducer delegateTreeProducer) {
+                this.delegateTreeProducer = delegateTreeProducer;
+            }
+
+            @Nonnull
+            @Override
+            public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
+                return delegateTreeProducer.createTransaction(isolated);
+            }
+
+            @Nonnull
+            @Override
+            public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+                return delegateTreeProducer.createProducer(subtrees);
+            }
+
+            @Override
+            public void close() throws DOMDataTreeProducerException {
+                // noop
+            }
+
+            public void closeDelegate() {
+                try {
+                    delegateTreeProducer.close();
+                } catch (final DOMDataTreeProducerException e) {
+                    throw new IllegalStateException("Trying to close DOMDataTreeProducer with open transaction", e);
+                }
+            }
+        }
+    }
+}
diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMWriteTransactionAdapter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMWriteTransactionAdapter.java
new file mode 100644 (file)
index 0000000..2e610c4
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShardedDOMWriteTransactionAdapter implements DOMDataTreeWriteTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMWriteTransactionAdapter.class);
+
+    private final Map<LogicalDatastoreType, DOMDataTreeProducer> producerMap;
+    private final Map<LogicalDatastoreType, DOMDataTreeCursorAwareTransaction> transactionMap;
+    private final Map<LogicalDatastoreType, DOMDataTreeWriteCursor> cursorMap;
+
+    private final DOMDataTreeService treeService;
+    private final Object txIdentifier;
+    private boolean finished = false;
+    private boolean initialized = false;
+
+    ShardedDOMWriteTransactionAdapter(final Object identifier, final DOMDataTreeService transactionDelegator) {
+        this.treeService = Preconditions.checkNotNull(transactionDelegator);
+        this.txIdentifier = Preconditions.checkNotNull(identifier);
+        this.producerMap = new EnumMap<>(LogicalDatastoreType.class);
+        this.transactionMap = new EnumMap<>(LogicalDatastoreType.class);
+        this.cursorMap = new EnumMap<>(LogicalDatastoreType.class);
+    }
+
+    @Override
+    public boolean cancel() {
+        LOG.debug("{}: Cancelling transaction");
+        if (finished == true) {
+            return false;
+        }
+
+        // We close cursor, cancel transactions and close producers and
+        // mark transaction as finished
+        cursorMap.values().forEach(DOMDataTreeWriteCursor::close);
+        transactionMap.values().forEach(domDataTreeCursorAwareTransaction ->
+                Preconditions.checkState(domDataTreeCursorAwareTransaction.cancel()));
+        closeProducers();
+        finished = true;
+        return true;
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+        checkRunning();
+        LOG.debug("{}: Submitting transaction", txIdentifier);
+        if (initialized == false) {
+            // If underlying producers, transactions and cursors are
+            // not even initialized just seal this transaction and
+            // return immediate future
+            finished = true;
+            return Futures.immediateCheckedFuture(null);
+        }
+        // First we need to close cursors
+        cursorMap.values().forEach(DOMDataTreeWriteCursor::close);
+        final ListenableFuture<List<Void>> aggregatedSubmit = Futures.allAsList(
+                transactionMap.get(LogicalDatastoreType.CONFIGURATION).submit(),
+                transactionMap.get(LogicalDatastoreType.OPERATIONAL).submit());
+
+        // Now we can close producers and mark transaction as finished
+        closeProducers();
+        finished = true;
+
+        return Futures.makeChecked(
+                Futures.transform(aggregatedSubmit, (List<Void> input) -> input.get(0)),
+                TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return txIdentifier;
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+                    final NormalizedNode<?, ?> data) {
+        checkRunning();
+        LOG.debug("{}: Invoking put operation at {}:{} with payload {}", txIdentifier, store, path);
+        if (initialized == false) {
+            initializeDataTreeProducerLayer(path.getParent());
+        }
+
+        final DOMDataTreeWriteCursor cursor = cursorMap.get(store);
+        cursor.write(path.getLastPathArgument(), data);
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+                      final NormalizedNode<?, ?> data) {
+        checkRunning();
+        LOG.debug("{}: Invoking merge operation at {}:{} with payload {}", txIdentifier, store, path);
+        if (initialized == false) {
+            initializeDataTreeProducerLayer(path.getParent());
+        }
+
+        final DOMDataTreeWriteCursor cursor = cursorMap.get(store);
+        cursor.merge(path.getLastPathArgument(), data);
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        checkRunning();
+        LOG.debug("{}: Invoking delete operation at {}:{}", txIdentifier, store, path);
+        if (initialized == false) {
+            initializeDataTreeProducerLayer(path.getParent());
+        }
+
+        final DOMDataTreeWriteCursor cursor = cursorMap.get(store);
+        cursor.delete(path.getLastPathArgument());
+    }
+
+    // TODO initialize producer, transaction and cursor for only
+    // for necessary data store at one time
+    private void initializeDataTreeProducerLayer(final YangInstanceIdentifier path) {
+        Preconditions.checkState(producerMap.isEmpty(), "Producers already initialized");
+        Preconditions.checkState(transactionMap.isEmpty(), "Transactions already initialized");
+        Preconditions.checkState(cursorMap.isEmpty(), "Cursors already initialized");
+
+        LOG.debug("{}: Creating data tree producers on path {}", txIdentifier, path);
+        producerMap.put(LogicalDatastoreType.CONFIGURATION,
+                treeService.createProducer(
+                        Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path))));
+        producerMap.put(LogicalDatastoreType.OPERATIONAL,
+                treeService.createProducer(
+                        Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, path))));
+
+        LOG.debug("{}: Creating DOMDataTreeCursorAwareTransactions delegates", txIdentifier, path);
+        transactionMap.put(LogicalDatastoreType.CONFIGURATION,
+                producerMap.get(LogicalDatastoreType.CONFIGURATION).createTransaction(true));
+        transactionMap.put(LogicalDatastoreType.OPERATIONAL,
+                producerMap.get(LogicalDatastoreType.OPERATIONAL).createTransaction(true));
+
+        LOG.debug("{}: Creating DOMDataTreeWriteCursors delegates");
+        cursorMap.put(LogicalDatastoreType.CONFIGURATION,
+                transactionMap.get(LogicalDatastoreType.CONFIGURATION)
+                        .createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path)));
+        cursorMap.put(LogicalDatastoreType.OPERATIONAL,
+                transactionMap.get(LogicalDatastoreType.OPERATIONAL)
+                        .createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, path)));
+
+        initialized = true;
+    }
+
+    private void checkRunning() {
+        Preconditions.checkState(finished == false, "{}: Transaction already finished");
+    }
+
+    private void closeProducers() {
+        producerMap.values().forEach(domDataTreeProducer ->
+        {
+            try {
+                domDataTreeProducer.close();
+            } catch (final DOMDataTreeProducerException e) {
+                throw new IllegalStateException("Trying to close DOMDataTreeProducer with open transaction", e);
+            }
+        });
+    }
+}
diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainReadTransaction.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainReadTransaction.java
new file mode 100644 (file)
index 0000000..30e0235
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class TransactionChainReadTransaction implements DOMDataTreeReadTransaction {
+
+    private final DOMDataTreeReadTransaction delegateReadTx;
+    private final ListenableFuture<Void> previousWriteTxFuture;
+    private final Object identifier;
+    private final ShardedDOMTransactionChainAdapter txChain;
+
+    TransactionChainReadTransaction(final Object txIdentifier, final DOMDataTreeReadTransaction delegateReadTx,
+                                    final ListenableFuture<Void> previousWriteTxFuture,
+                                    final ShardedDOMTransactionChainAdapter txChain) {
+        this.delegateReadTx = delegateReadTx;
+        this.previousWriteTxFuture = previousWriteTxFuture;
+        this.identifier = txIdentifier;
+        this.txChain = txChain;
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+                                                                                   final YangInstanceIdentifier path) {
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> readResult = SettableFuture.create();
+
+        Futures.addCallback(previousWriteTxFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(@Nullable final Void result) {
+                Futures.addCallback(delegateReadTx.read(store, path),
+                        new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+                            @Override
+                            public void onSuccess(@Nullable final Optional<NormalizedNode<?, ?>> result) {
+                                readResult.set(result);
+                            }
+
+                            @Override
+                            public void onFailure(final Throwable t) {
+                                txChain.transactionFailed(TransactionChainReadTransaction.this, t);
+                                readResult.setException(t);
+                            }
+                        });
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                // we don't have to notify txchain about this failure
+                // failed write transaction should do this
+                readResult.setException(t);
+            }
+        });
+
+        return Futures.makeChecked(readResult, ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+                                                              final YangInstanceIdentifier path) {
+        final Function<Optional<NormalizedNode<?, ?>>, Boolean> transform =
+                optionalNode -> optionalNode.isPresent() ? Boolean.TRUE : Boolean.FALSE;
+        final ListenableFuture<Boolean> existsResult = Futures.transform(read(store, path), transform);
+        return Futures.makeChecked(existsResult, ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public void close() {
+        delegateReadTx.close();
+        txChain.closeReadTransaction();
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return identifier;
+    }
+}
diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainWriteTransaction.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainWriteTransaction.java
new file mode 100644 (file)
index 0000000..187bb42
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class TransactionChainWriteTransaction implements DOMDataTreeWriteTransaction {
+    private final DOMDataTreeWriteTransaction delegateTx;
+    private final Object identifier;
+    private final ShardedDOMTransactionChainAdapter txChain;
+
+    public TransactionChainWriteTransaction(final Object identifier, final DOMDataTreeWriteTransaction delegateTx,
+                                            final ShardedDOMTransactionChainAdapter txChain) {
+        this.delegateTx = Preconditions.checkNotNull(delegateTx);
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.txChain = Preconditions.checkNotNull(txChain);
+    }
+
+
+    @Override
+    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        delegateTx.put(store, path, data);
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        delegateTx.merge(store, path, data);
+    }
+
+    @Override
+    public boolean cancel() {
+        txChain.closeWriteTransaction(Futures.immediateFuture(null));
+        return delegateTx.cancel();
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        delegateTx.delete(store, path);
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+        final CheckedFuture<Void, TransactionCommitFailedException> writeResultFuture = delegateTx.submit();
+        Futures.addCallback(writeResultFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(@Nullable final Void result) {
+                // NOOP
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                txChain.transactionFailed(TransactionChainWriteTransaction.this, t);
+            }
+        });
+
+        txChain.closeWriteTransaction(writeResultFuture);
+        return writeResultFuture;
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return identifier;
+    }
+}
diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapterTest.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapterTest.java
new file mode 100644 (file)
index 0000000..7c643b1
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.broker;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.broker.util.TestModel;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+
+public class ShardedDOMReadTransactionAdapterTest {
+
+    private ShardedDOMReadTransactionAdapter readTx;
+
+    @Before
+    public void setUp() {
+        readTx = new ShardedDOMReadTransactionAdapter("TEST-TX", new TestTreeService());
+    }
+
+    @Test
+    public void testGetIdentifier() {
+        assertEquals("TEST-TX", readTx.getIdentifier());
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readResult =
+                readTx.read(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+        assertEquals(readResult.checkedGet().get(), TestUtils.TEST_CONTAINER);
+    }
+
+
+    private static class TestTreeService implements DOMDataTreeService {
+
+        @Nonnull
+        @Override
+        public <T extends DOMDataTreeListener> ListenerRegistration<T>
+        registerListener(@Nonnull final T listener, @Nonnull final Collection<DOMDataTreeIdentifier> subtrees,
+                         final boolean allowRxMerges,
+                         @Nonnull final Collection<DOMDataTreeProducer> producers) throws DOMDataTreeLoopException {
+            final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtree = Maps.newHashMap();
+            subtree.put(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH),
+                    TestUtils.TEST_CONTAINER);
+
+            listener.onDataTreeChanged(Collections.singleton(
+                    DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, TestUtils.TEST_CONTAINER)), subtree);
+
+            return new ListenerRegistration<T>() {
+                @Override
+                public void close() {
+                    // NOOP
+                }
+
+                @Override
+                public T getInstance() {
+                    return listener;
+                }
+            };
+        }
+
+        @Nonnull
+        @Override
+        public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+            return null;
+        }
+    }
+}
\ No newline at end of file