From dd7ad777991f47037b10832d843b82a24daf0a1d Mon Sep 17 00:00:00 2001 From: Jakub Morvay Date: Mon, 1 Aug 2016 09:36:39 +0200 Subject: [PATCH] Bug 5699 - Migrate existing code to use new sharding apis This adds adapters that translates DOMDataBroker API calls to cursor aware API calls Change-Id: Iab0a94af598f355768f6ef48ac8cb358960418b1 Signed-off-by: Jakub Morvay --- .../broker/ShardedDOMDataBrokerAdapter.java | 60 +++++ .../ShardedDOMReadTransactionAdapter.java | 161 +++++++++++++ .../ShardedDOMTransactionChainAdapter.java | 217 ++++++++++++++++++ .../ShardedDOMWriteTransactionAdapter.java | 185 +++++++++++++++ .../TransactionChainReadTransaction.java | 95 ++++++++ .../TransactionChainWriteTransaction.java | 79 +++++++ .../ShardedDOMReadTransactionAdapterTest.java | 90 ++++++++ 7 files changed, 887 insertions(+) create mode 100644 dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataBrokerAdapter.java create mode 100644 dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapter.java create mode 100644 dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMTransactionChainAdapter.java create mode 100644 dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMWriteTransactionAdapter.java create mode 100644 dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainReadTransaction.java create mode 100644 dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainWriteTransaction.java create mode 100644 dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapterTest.java diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataBrokerAdapter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataBrokerAdapter.java new file mode 100644 index 0000000000..7faad87787 --- /dev/null +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataBrokerAdapter.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.broker; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.opendaylight.mdsal.common.api.TransactionChainListener; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; + +public class ShardedDOMDataBrokerAdapter implements DOMDataBroker { + + private final DOMDataTreeService service; + private final AtomicLong txNum = new AtomicLong(); + private final AtomicLong chainNum = new AtomicLong(); + + public ShardedDOMDataBrokerAdapter(final DOMDataTreeService service) { + this.service = service; + } + + @Override + public Map, DOMDataBrokerExtension> getSupportedExtensions() { + return Collections.emptyMap(); + } + + @Override + public DOMDataTreeReadTransaction newReadOnlyTransaction() { + return new ShardedDOMReadTransactionAdapter(newTransactionIdentifier(), service); + } + + @Override + public DOMDataTreeWriteTransaction newWriteOnlyTransaction() { + return new ShardedDOMWriteTransactionAdapter(newTransactionIdentifier(), service); + } + + @Override + public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) { + return new ShardedDOMTransactionChainAdapter(newChainIdentifier(), service, listener); + } + + private Object newTransactionIdentifier() { + return "DOM-" + txNum.getAndIncrement(); + } + + private Object newChainIdentifier() { + return "DOM-CHAIN-" + chainNum; + } + +} diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapter.java new file mode 100644 index 0000000000..c0f7ae1875 --- /dev/null +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapter.java @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.broker; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ShardedDOMReadTransactionAdapter implements DOMDataTreeReadTransaction { + + private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMReadTransactionAdapter.class.getName()); + + private final List> registrations = Lists.newArrayList(); + private final DOMDataTreeService service; + private final Object txIdentifier; + + private boolean finished = false; + + ShardedDOMReadTransactionAdapter(final Object identifier, final DOMDataTreeService service) { + this.service = Preconditions.checkNotNull(service); + this.txIdentifier = Preconditions.checkNotNull(identifier); + } + + @Override + public void close() { + // TODO should we also cancel all read futures? + LOG.debug("{}: Closing read transaction", txIdentifier); + if (finished == true) { + return; + } + + registrations.forEach(ListenerRegistration::close); + finished = true; + } + + @Override + public Object getIdentifier() { + return txIdentifier; + } + + @Override + public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + checkRunning(); + LOG.debug("{}: Invoking read at {}:{}", txIdentifier, store, path); + final ListenerRegistration reg; + final SettableFuture>> initialDataTreeChangeFuture = SettableFuture.create(); + try { + reg = service.registerListener(new ReadShardedListener(initialDataTreeChangeFuture), + Collections.singleton(new DOMDataTreeIdentifier(store, path)), false, Collections.emptyList()); + registrations.add(reg); + } catch (final DOMDataTreeLoopException e) { + // This should not happen, we are not specifying any + // producers when registering listener + throw new IllegalStateException("Loop in listener and producers detected", e); + } + + // After data tree change future is finished, we can close the listener registration + Futures.addCallback(initialDataTreeChangeFuture, new FutureCallback>>() { + @Override + public void onSuccess(@Nullable final Optional> result) { + reg.close(); + } + + @Override + public void onFailure(final Throwable t) { + reg.close(); + } + }); + + return Futures.makeChecked(initialDataTreeChangeFuture, ReadFailedException.MAPPER); + } + + @Override + public CheckedFuture exists(final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + checkRunning(); + LOG.debug("{}: Invoking exists at {}:{}", txIdentifier, store, path); + final Function>, Boolean> transform = + optionalNode -> optionalNode.isPresent() ? Boolean.TRUE : Boolean.FALSE; + final ListenableFuture existsResult = Futures.transform(read(store, path), transform); + return Futures.makeChecked(existsResult, ReadFailedException.MAPPER); + } + + private void checkRunning() { + Preconditions.checkState(finished == false, "Transaction is already closed"); + } + + static class ReadShardedListener implements DOMDataTreeListener { + + private final SettableFuture>> readResultFuture; + + public ReadShardedListener(final SettableFuture>> future) { + this.readResultFuture = Preconditions.checkNotNull(future); + } + + @Override + public void onDataTreeChanged(final Collection changes, + final Map> subtrees) { + Preconditions.checkState(changes.size() == 1 && subtrees.size() == 1, + "DOMDataTreeListener registered exactly on one subtree"); + + for (final DataTreeCandidate change : changes) { + if (change.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) { + readResultFuture.set(Optional.absent()); + return; + } + } + + for (final NormalizedNode initialState : subtrees.values()) { + readResultFuture.set(Optional.of(initialState)); + } + } + + @Override + public void onDataTreeFailed(final Collection causes) { + // TODO If we get just one exception, we don't need to do + // chaining + + // We chain all exceptions and return aggregated one + readResultFuture.setException(new DOMDataTreeListeningException("Aggregated DOMDataTreeListening exception", + causes.stream().reduce((e1, e2) -> + { + e1.addSuppressed(e2); + return e1; + }).get())); + } + } +} diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMTransactionChainAdapter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMTransactionChainAdapter.java new file mode 100644 index 0000000000..e6fa8bce3f --- /dev/null +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMTransactionChainAdapter.java @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.broker; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.mdsal.common.api.AsyncTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionChainListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.yangtools.concepts.ListenerRegistration; + +public class ShardedDOMTransactionChainAdapter implements DOMTransactionChain { + + private final DOMDataTreeService dataTreeService; + private final Object txChainIdentifier; + private final AtomicLong txNum = new AtomicLong(); + private final TransactionChainListener txChainListener; + private final CachedDataTreeService cachedDataTreeService; + private TransactionChainWriteTransaction writeTx; + private TransactionChainReadTransaction readTx; + private ListenableFuture writeTxSubmitFuture; + private boolean finished = false; + + public ShardedDOMTransactionChainAdapter(final Object txChainIdentifier, + final DOMDataTreeService dataTreeService, + final TransactionChainListener txChainListener) { + Preconditions.checkNotNull(dataTreeService); + Preconditions.checkNotNull(txChainIdentifier); + this.dataTreeService = dataTreeService; + this.txChainIdentifier = txChainIdentifier; + this.txChainListener = txChainListener; + this.cachedDataTreeService = new CachedDataTreeService(dataTreeService); + } + + @Override + public DOMDataTreeReadTransaction newReadOnlyTransaction() { + checkRunning(); + checkReadTxClosed(); + checkWriteTxClosed(); + readTx = new TransactionChainReadTransaction(newTransactionIdentifier(), + new ShardedDOMReadTransactionAdapter(newTransactionIdentifier(), dataTreeService), + writeTxSubmitFuture, this); + + return readTx; + } + + @Override + public DOMDataTreeWriteTransaction newWriteOnlyTransaction() { + checkRunning(); + checkWriteTxClosed(); + checkReadTxClosed(); + writeTx = new TransactionChainWriteTransaction(newTransactionIdentifier(), + new ShardedDOMWriteTransactionAdapter(newTransactionIdentifier(), + cachedDataTreeService), this); + + return writeTx; + } + + @Override + public void close() { + if (finished = true) { + // already closed, do nothing + return; + } + + checkReadTxClosed(); + checkWriteTxClosed(); + Futures.addCallback(writeTxSubmitFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void result) { + txChainListener.onTransactionChainSuccessful(ShardedDOMTransactionChainAdapter.this); + } + + @Override + public void onFailure(final Throwable t) { + // We don't have to do nothing here, + // tx should take car of it + } + }); + + cachedDataTreeService.closeProducers(); + finished = true; + } + + public void closeReadTransaction() { + readTx = null; + } + + public void closeWriteTransaction(final ListenableFuture submitFuture) { + writeTxSubmitFuture = submitFuture; + writeTx = null; + } + + private Object newTransactionIdentifier() { + return "DOM-CHAIN-" + txChainIdentifier + "-" + txNum.getAndIncrement(); + } + + private void checkWriteTxClosed() { + Preconditions.checkState(writeTx == null); + } + + private void checkReadTxClosed() { + Preconditions.checkState(readTx == null); + } + + private void checkRunning() { + Preconditions.checkState(finished == false); + } + + public void transactionFailed(final AsyncTransaction tx, final Throwable cause) { + txChainListener.onTransactionChainFailed(this, tx, cause); + if (writeTx != null) { + writeTx.cancel(); + } + if (readTx != null) { + readTx.close(); + } + cachedDataTreeService.closeProducers(); + finished = true; + } + + static class CachedDataTreeService implements DOMDataTreeService { + + private final DOMDataTreeService delegateTreeService; + private final Map producersMap = + new EnumMap<>(LogicalDatastoreType.class); + + CachedDataTreeService(final DOMDataTreeService delegateTreeService) { + this.delegateTreeService = delegateTreeService; + } + + void closeProducers() { + producersMap.values().forEach(NoopCloseDataProducer::closeDelegate); + } + + @Nonnull + @Override + public ListenerRegistration + registerListener(@Nonnull final T listener, @Nonnull final Collection subtrees, + final boolean allowRxMerges, @Nonnull final Collection producers) + throws DOMDataTreeLoopException { + return delegateTreeService.registerListener(listener, subtrees, allowRxMerges, producers); + } + + @Override + public DOMDataTreeProducer createProducer(@Nonnull final Collection subtrees) { + Preconditions.checkState(subtrees.size() == 1); + NoopCloseDataProducer producer = null; + for (final DOMDataTreeIdentifier treeId : subtrees) { + producer = + new NoopCloseDataProducer(delegateTreeService.createProducer(Collections.singleton(treeId))); + producersMap.putIfAbsent(treeId.getDatastoreType(), + producer); + } + return producer; + } + + static class NoopCloseDataProducer implements DOMDataTreeProducer { + + private final DOMDataTreeProducer delegateTreeProducer; + + NoopCloseDataProducer(final DOMDataTreeProducer delegateTreeProducer) { + this.delegateTreeProducer = delegateTreeProducer; + } + + @Nonnull + @Override + public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) { + return delegateTreeProducer.createTransaction(isolated); + } + + @Nonnull + @Override + public DOMDataTreeProducer createProducer(@Nonnull final Collection subtrees) { + return delegateTreeProducer.createProducer(subtrees); + } + + @Override + public void close() throws DOMDataTreeProducerException { + // noop + } + + public void closeDelegate() { + try { + delegateTreeProducer.close(); + } catch (final DOMDataTreeProducerException e) { + throw new IllegalStateException("Trying to close DOMDataTreeProducer with open transaction", e); + } + } + } + } +} diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMWriteTransactionAdapter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMWriteTransactionAdapter.java new file mode 100644 index 0000000000..2e610c4560 --- /dev/null +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMWriteTransactionAdapter.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.broker; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ShardedDOMWriteTransactionAdapter implements DOMDataTreeWriteTransaction { + private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMWriteTransactionAdapter.class); + + private final Map producerMap; + private final Map transactionMap; + private final Map cursorMap; + + private final DOMDataTreeService treeService; + private final Object txIdentifier; + private boolean finished = false; + private boolean initialized = false; + + ShardedDOMWriteTransactionAdapter(final Object identifier, final DOMDataTreeService transactionDelegator) { + this.treeService = Preconditions.checkNotNull(transactionDelegator); + this.txIdentifier = Preconditions.checkNotNull(identifier); + this.producerMap = new EnumMap<>(LogicalDatastoreType.class); + this.transactionMap = new EnumMap<>(LogicalDatastoreType.class); + this.cursorMap = new EnumMap<>(LogicalDatastoreType.class); + } + + @Override + public boolean cancel() { + LOG.debug("{}: Cancelling transaction"); + if (finished == true) { + return false; + } + + // We close cursor, cancel transactions and close producers and + // mark transaction as finished + cursorMap.values().forEach(DOMDataTreeWriteCursor::close); + transactionMap.values().forEach(domDataTreeCursorAwareTransaction -> + Preconditions.checkState(domDataTreeCursorAwareTransaction.cancel())); + closeProducers(); + finished = true; + return true; + } + + @Override + public CheckedFuture submit() { + checkRunning(); + LOG.debug("{}: Submitting transaction", txIdentifier); + if (initialized == false) { + // If underlying producers, transactions and cursors are + // not even initialized just seal this transaction and + // return immediate future + finished = true; + return Futures.immediateCheckedFuture(null); + } + // First we need to close cursors + cursorMap.values().forEach(DOMDataTreeWriteCursor::close); + final ListenableFuture> aggregatedSubmit = Futures.allAsList( + transactionMap.get(LogicalDatastoreType.CONFIGURATION).submit(), + transactionMap.get(LogicalDatastoreType.OPERATIONAL).submit()); + + // Now we can close producers and mark transaction as finished + closeProducers(); + finished = true; + + return Futures.makeChecked( + Futures.transform(aggregatedSubmit, (List input) -> input.get(0)), + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + } + + @Override + public Object getIdentifier() { + return txIdentifier; + } + + @Override + public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, + final NormalizedNode data) { + checkRunning(); + LOG.debug("{}: Invoking put operation at {}:{} with payload {}", txIdentifier, store, path); + if (initialized == false) { + initializeDataTreeProducerLayer(path.getParent()); + } + + final DOMDataTreeWriteCursor cursor = cursorMap.get(store); + cursor.write(path.getLastPathArgument(), data); + } + + @Override + public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, + final NormalizedNode data) { + checkRunning(); + LOG.debug("{}: Invoking merge operation at {}:{} with payload {}", txIdentifier, store, path); + if (initialized == false) { + initializeDataTreeProducerLayer(path.getParent()); + } + + final DOMDataTreeWriteCursor cursor = cursorMap.get(store); + cursor.merge(path.getLastPathArgument(), data); + } + + @Override + public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + checkRunning(); + LOG.debug("{}: Invoking delete operation at {}:{}", txIdentifier, store, path); + if (initialized == false) { + initializeDataTreeProducerLayer(path.getParent()); + } + + final DOMDataTreeWriteCursor cursor = cursorMap.get(store); + cursor.delete(path.getLastPathArgument()); + } + + // TODO initialize producer, transaction and cursor for only + // for necessary data store at one time + private void initializeDataTreeProducerLayer(final YangInstanceIdentifier path) { + Preconditions.checkState(producerMap.isEmpty(), "Producers already initialized"); + Preconditions.checkState(transactionMap.isEmpty(), "Transactions already initialized"); + Preconditions.checkState(cursorMap.isEmpty(), "Cursors already initialized"); + + LOG.debug("{}: Creating data tree producers on path {}", txIdentifier, path); + producerMap.put(LogicalDatastoreType.CONFIGURATION, + treeService.createProducer( + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path)))); + producerMap.put(LogicalDatastoreType.OPERATIONAL, + treeService.createProducer( + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, path)))); + + LOG.debug("{}: Creating DOMDataTreeCursorAwareTransactions delegates", txIdentifier, path); + transactionMap.put(LogicalDatastoreType.CONFIGURATION, + producerMap.get(LogicalDatastoreType.CONFIGURATION).createTransaction(true)); + transactionMap.put(LogicalDatastoreType.OPERATIONAL, + producerMap.get(LogicalDatastoreType.OPERATIONAL).createTransaction(true)); + + LOG.debug("{}: Creating DOMDataTreeWriteCursors delegates"); + cursorMap.put(LogicalDatastoreType.CONFIGURATION, + transactionMap.get(LogicalDatastoreType.CONFIGURATION) + .createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path))); + cursorMap.put(LogicalDatastoreType.OPERATIONAL, + transactionMap.get(LogicalDatastoreType.OPERATIONAL) + .createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, path))); + + initialized = true; + } + + private void checkRunning() { + Preconditions.checkState(finished == false, "{}: Transaction already finished"); + } + + private void closeProducers() { + producerMap.values().forEach(domDataTreeProducer -> + { + try { + domDataTreeProducer.close(); + } catch (final DOMDataTreeProducerException e) { + throw new IllegalStateException("Trying to close DOMDataTreeProducer with open transaction", e); + } + }); + } +} diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainReadTransaction.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainReadTransaction.java new file mode 100644 index 0000000000..30e0235ff6 --- /dev/null +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainReadTransaction.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.broker; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import javax.annotation.Nullable; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class TransactionChainReadTransaction implements DOMDataTreeReadTransaction { + + private final DOMDataTreeReadTransaction delegateReadTx; + private final ListenableFuture previousWriteTxFuture; + private final Object identifier; + private final ShardedDOMTransactionChainAdapter txChain; + + TransactionChainReadTransaction(final Object txIdentifier, final DOMDataTreeReadTransaction delegateReadTx, + final ListenableFuture previousWriteTxFuture, + final ShardedDOMTransactionChainAdapter txChain) { + this.delegateReadTx = delegateReadTx; + this.previousWriteTxFuture = previousWriteTxFuture; + this.identifier = txIdentifier; + this.txChain = txChain; + } + + @Override + public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + final SettableFuture>> readResult = SettableFuture.create(); + + Futures.addCallback(previousWriteTxFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void result) { + Futures.addCallback(delegateReadTx.read(store, path), + new FutureCallback>>() { + + @Override + public void onSuccess(@Nullable final Optional> result) { + readResult.set(result); + } + + @Override + public void onFailure(final Throwable t) { + txChain.transactionFailed(TransactionChainReadTransaction.this, t); + readResult.setException(t); + } + }); + } + + @Override + public void onFailure(final Throwable t) { + // we don't have to notify txchain about this failure + // failed write transaction should do this + readResult.setException(t); + } + }); + + return Futures.makeChecked(readResult, ReadFailedException.MAPPER); + } + + @Override + public CheckedFuture exists(final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + final Function>, Boolean> transform = + optionalNode -> optionalNode.isPresent() ? Boolean.TRUE : Boolean.FALSE; + final ListenableFuture existsResult = Futures.transform(read(store, path), transform); + return Futures.makeChecked(existsResult, ReadFailedException.MAPPER); + } + + @Override + public void close() { + delegateReadTx.close(); + txChain.closeReadTransaction(); + } + + @Override + public Object getIdentifier() { + return identifier; + } +} diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainWriteTransaction.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainWriteTransaction.java new file mode 100644 index 0000000000..187bb42690 --- /dev/null +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/TransactionChainWriteTransaction.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.broker; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import javax.annotation.Nullable; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class TransactionChainWriteTransaction implements DOMDataTreeWriteTransaction { + private final DOMDataTreeWriteTransaction delegateTx; + private final Object identifier; + private final ShardedDOMTransactionChainAdapter txChain; + + public TransactionChainWriteTransaction(final Object identifier, final DOMDataTreeWriteTransaction delegateTx, + final ShardedDOMTransactionChainAdapter txChain) { + this.delegateTx = Preconditions.checkNotNull(delegateTx); + this.identifier = Preconditions.checkNotNull(identifier); + this.txChain = Preconditions.checkNotNull(txChain); + } + + + @Override + public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) { + delegateTx.put(store, path, data); + } + + @Override + public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) { + delegateTx.merge(store, path, data); + } + + @Override + public boolean cancel() { + txChain.closeWriteTransaction(Futures.immediateFuture(null)); + return delegateTx.cancel(); + } + + @Override + public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + delegateTx.delete(store, path); + } + + @Override + public CheckedFuture submit() { + final CheckedFuture writeResultFuture = delegateTx.submit(); + Futures.addCallback(writeResultFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void result) { + // NOOP + } + + @Override + public void onFailure(final Throwable t) { + txChain.transactionFailed(TransactionChainWriteTransaction.this, t); + } + }); + + txChain.closeWriteTransaction(writeResultFuture); + return writeResultFuture; + } + + @Override + public Object getIdentifier() { + return identifier; + } +} diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapterTest.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapterTest.java new file mode 100644 index 0000000000..7c643b1d29 --- /dev/null +++ b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMReadTransactionAdapterTest.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.broker; + +import static org.junit.Assert.assertEquals; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import javax.annotation.Nonnull; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; +import org.opendaylight.mdsal.dom.broker.util.TestModel; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; + +public class ShardedDOMReadTransactionAdapterTest { + + private ShardedDOMReadTransactionAdapter readTx; + + @Before + public void setUp() { + readTx = new ShardedDOMReadTransactionAdapter("TEST-TX", new TestTreeService()); + } + + @Test + public void testGetIdentifier() { + assertEquals("TEST-TX", readTx.getIdentifier()); + } + + @Test + public void testRead() throws Exception { + final CheckedFuture>, ReadFailedException> readResult = + readTx.read(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + assertEquals(readResult.checkedGet().get(), TestUtils.TEST_CONTAINER); + } + + + private static class TestTreeService implements DOMDataTreeService { + + @Nonnull + @Override + public ListenerRegistration + registerListener(@Nonnull final T listener, @Nonnull final Collection subtrees, + final boolean allowRxMerges, + @Nonnull final Collection producers) throws DOMDataTreeLoopException { + final Map> subtree = Maps.newHashMap(); + subtree.put(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH), + TestUtils.TEST_CONTAINER); + + listener.onDataTreeChanged(Collections.singleton( + DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, TestUtils.TEST_CONTAINER)), subtree); + + return new ListenerRegistration() { + @Override + public void close() { + // NOOP + } + + @Override + public T getInstance() { + return listener; + } + }; + } + + @Nonnull + @Override + public DOMDataTreeProducer createProducer(@Nonnull final Collection subtrees) { + return null; + } + } +} \ No newline at end of file -- 2.36.6