From: Moiz Raja Date: Wed, 22 Apr 2015 13:42:05 +0000 (-0700) Subject: Create transaction on the backend datastore only when neccessary X-Git-Tag: release/lithium~224^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f3e6688eb7028378ef0863171b9e9629605f3572 Create transaction on the backend datastore only when neccessary I've borrowed a bunch of code from the data broker because I needed to modify the code for lazy transaction creation which needed me to pass around the factories for creating the transaction on the appropriate store. Basic tests are in place for now which ensure that we do not create transactions on all the stores when not needed and that on submit we only have cohorts for the transactions that we created. The data broker still goes through the three phase commit but since now this has been optimized with the direct commit enhancement on the datastore itself it should not matter if we call one method or 4 as far as message passing goes. Calling the 4 methods may result in extra object creation and such which could be avoided. I would put that in a follow up commit. Change-Id: Id77bb1642748e7df15e084a3f0b5e580783f2f8d Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBroker.java new file mode 100644 index 0000000000..833cb49462 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBroker.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import static com.google.common.base.Preconditions.checkState; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension; +import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractDOMBroker extends AbstractDOMTransactionFactory + implements DOMDataBroker, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractDOMBroker.class); + + private final AtomicLong txNum = new AtomicLong(); + private final AtomicLong chainNum = new AtomicLong(); + private final Map, DOMDataBrokerExtension> extensions; + private volatile AutoCloseable closeable; + + protected AbstractDOMBroker(final Map datastores) { + super(datastores); + + boolean treeChange = true; + for (DOMStore ds : datastores.values()) { + if (!(ds instanceof DOMStoreTreeChangePublisher)) { + treeChange = false; + break; + } + } + + if (treeChange) { + extensions = ImmutableMap., DOMDataBrokerExtension>of(DOMDataTreeChangeService.class, new DOMDataTreeChangeService() { + @Override + public ListenerRegistration registerDataTreeChangeListener(final DOMDataTreeIdentifier treeId, final L listener) { + DOMStore publisher = getTxFactories().get(treeId.getDatastoreType()); + checkState(publisher != null, "Requested logical data store is not available."); + + return ((DOMStoreTreeChangePublisher) publisher).registerTreeChangeListener(treeId.getRootIdentifier(), listener); + } + }); + } else { + extensions = Collections.emptyMap(); + } + } + + public void setCloseable(final AutoCloseable closeable) { + this.closeable = closeable; + } + + @Override + public void close() { + super.close(); + + if (closeable != null) { + try { + closeable.close(); + } catch (Exception e) { + LOG.debug("Error closing instance", e); + } + } + } + + @Override + protected Object newTransactionIdentifier() { + return "DOM-" + txNum.getAndIncrement(); + } + + @Override + public ListenerRegistration registerDataChangeListener(final LogicalDatastoreType store, + final YangInstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) { + + DOMStore potentialStore = getTxFactories().get(store); + checkState(potentialStore != null, "Requested logical data store is not available."); + return potentialStore.registerChangeListener(path, listener, triggeringScope); + } + + @Override + public Map, DOMDataBrokerExtension> getSupportedExtensions() { + return extensions; + } + + @Override + public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) { + checkNotClosed(); + + final Map backingChains = new EnumMap<>(LogicalDatastoreType.class); + for (Map.Entry entry : getTxFactories().entrySet()) { + backingChains.put(entry.getKey(), entry.getValue().createTransactionChain()); + } + + final long chainId = chainNum.getAndIncrement(); + LOG.debug("Transaction chain {} created with listener {}, backing store chains {}", chainId, listener, + backingChains); + return new DOMBrokerTransactionChain(chainId, backingChains, this, listener); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java new file mode 100644 index 0000000000..98fea88f63 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.EnumMap; +import java.util.Map; +import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public abstract class AbstractDOMBrokerTransaction implements + AsyncTransaction> { + + private Map backingTxs; + private final Object identifier; + private final Map storeTxFactories; + + /** + * + * Creates new composite Transactions. + * + * @param identifier + * Identifier of transaction. + */ + protected AbstractDOMBrokerTransaction(final Object identifier, Map storeTxFactories) { + this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null"); + this.storeTxFactories = Preconditions.checkNotNull(storeTxFactories, "Store Transaction Factories should not be null"); + this.backingTxs = new EnumMap(LogicalDatastoreType.class); + } + + /** + * Returns subtransaction associated with supplied key. + * + * @param key + * @return + * @throws NullPointerException + * if key is null + * @throws IllegalArgumentException + * if no subtransaction is associated with key. + */ + protected final T getSubtransaction(final K key) { + Preconditions.checkNotNull(key, "key must not be null."); + + T ret = backingTxs.get(key); + if(ret == null){ + ret = createTransaction(key); + backingTxs.put(key, ret); + } + Preconditions.checkArgument(ret != null, "No subtransaction associated with %s", key); + return ret; + } + + protected abstract T createTransaction(final K key); + + /** + * Returns immutable Iterable of all subtransactions. + * + */ + protected Collection getSubtransactions() { + return backingTxs.values(); + } + + @Override + public Object getIdentifier() { + return identifier; + } + + protected void closeSubtransactions() { + /* + * We share one exception for all failures, which are added + * as supressedExceptions to it. + */ + IllegalStateException failure = null; + for (T subtransaction : backingTxs.values()) { + try { + subtransaction.close(); + } catch (Exception e) { + // If we did not allocated failure we allocate it + if (failure == null) { + failure = new IllegalStateException("Uncaught exception occured during closing transaction", e); + } else { + // We update it with additional exceptions, which occurred during error. + failure.addSuppressed(e); + } + } + } + // If we have failure, we throw it at after all attempts to close. + if (failure != null) { + throw failure; + } + } + + protected DOMStoreTransactionFactory getTxFactory(K type){ + return storeTxFactories.get(type); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java new file mode 100644 index 0000000000..2187c6e0f9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.Collection; +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; + +public abstract class AbstractDOMTransactionFactory implements AutoCloseable { + private static final AtomicIntegerFieldUpdater UPDATER = + AtomicIntegerFieldUpdater.newUpdater(AbstractDOMTransactionFactory.class, "closed"); + private final Map storeTxFactories; + private volatile int closed = 0; + + protected AbstractDOMTransactionFactory(final Map txFactories) { + this.storeTxFactories = new EnumMap<>(txFactories); + } + + /** + * Implementations must return unique identifier for each and every call of + * this method; + * + * @return new Unique transaction identifier. + */ + protected abstract Object newTransactionIdentifier(); + + /** + * + * @param transaction + * @param cohorts + * @return + */ + protected abstract CheckedFuture submit(final DOMDataWriteTransaction transaction, + final Collection cohorts); + + /** + * + * @return + */ + public final DOMDataReadOnlyTransaction newReadOnlyTransaction() { + checkNotClosed(); + + return new DOMBrokerReadOnlyTransaction(newTransactionIdentifier(), storeTxFactories); + } + + + /** + * + * @return + */ + public final DOMDataWriteTransaction newWriteOnlyTransaction() { + checkNotClosed(); + + return new DOMBrokerWriteOnlyTransaction(newTransactionIdentifier(), storeTxFactories, this); + } + + + /** + * + * @return + */ + public final DOMDataReadWriteTransaction newReadWriteTransaction() { + checkNotClosed(); + + return new DOMBrokerReadWriteTransaction<>(newTransactionIdentifier(), storeTxFactories, this); + } + + /** + * Convenience accessor of backing factories intended to be used only by + * finalization of this class. + * + * Note: + * Finalization of this class may want to access other functionality of + * supplied Transaction factories. + * + * @return Map of backing transaction factories. + */ + protected final Map getTxFactories() { + return storeTxFactories; + } + + /** + * Checks if instance is not closed. + * + * @throws IllegalStateException If instance of this class was closed. + * + */ + protected final void checkNotClosed() { + Preconditions.checkState(closed == 0, "Transaction factory was closed. No further operations allowed."); + } + + @Override + public void close() { + final boolean success = UPDATER.compareAndSet(this, 0, 1); + Preconditions.checkState(success, "Transaction factory was already closed"); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java new file mode 100644 index 0000000000..656ced37c1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.Map; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class DOMBrokerReadOnlyTransaction + extends AbstractDOMBrokerTransaction + implements DOMDataReadOnlyTransaction { + /** + * Creates new composite Transactions. + * + * @param identifier Identifier of transaction. + */ + protected DOMBrokerReadOnlyTransaction(Object identifier, Map storeTxFactories) { + super(identifier, storeTxFactories); + } + + @Override + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { + return getSubtransaction(store).read(path); + } + + @Override + public CheckedFuture exists( + final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + return getSubtransaction(store).exists(path); + } + + @Override + public void close() { + closeSubtransactions(); + } + + @Override + protected T createTransaction(LogicalDatastoreType key) { + return (T) getTxFactory(key).newReadOnlyTransaction(); + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadWriteTransaction.java new file mode 100644 index 0000000000..efa7226219 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadWriteTransaction.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.Map; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class DOMBrokerReadWriteTransaction + extends DOMBrokerWriteOnlyTransaction implements DOMDataReadWriteTransaction { + /** + * Creates new composite Transactions. + * + * @param identifier Identifier of transaction. + * @param storeTxFactories + */ + protected DOMBrokerReadWriteTransaction(Object identifier, Map storeTxFactories, final AbstractDOMTransactionFactory commitImpl) { + super(identifier, storeTxFactories, commitImpl); + } + + @Override + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { + return getSubtransaction(store).read(path); + } + + @Override + public CheckedFuture exists( + final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + return getSubtransaction(store).exists(path); + } + + @Override + protected DOMStoreReadWriteTransaction createTransaction(LogicalDatastoreType key) { + return getTxFactory(key).newReadWriteTransaction(); + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java new file mode 100644 index 0000000000..9610647fff --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DOMBrokerTransactionChain extends AbstractDOMTransactionFactory + implements DOMTransactionChain { + private static enum State { + RUNNING, + CLOSING, + CLOSED, + FAILED, + } + + private static final AtomicIntegerFieldUpdater COUNTER_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, "counter"); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, State.class, "state"); + private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerTransactionChain.class); + private final AtomicLong txNum = new AtomicLong(); + private final AbstractDOMBroker broker; + private final TransactionChainListener listener; + private final long chainId; + + private volatile State state = State.RUNNING; + private volatile int counter = 0; + + /** + * + * @param chainId + * ID of transaction chain + * @param chains + * Backing {@link DOMStoreTransactionChain}s. + * @param listener + * Listener, which listens on transaction chain events. + * @throws NullPointerException + * If any of arguments is null. + */ + public DOMBrokerTransactionChain(final long chainId, + final Map chains, + AbstractDOMBroker broker, final TransactionChainListener listener) { + super(chains); + this.chainId = chainId; + this.broker = Preconditions.checkNotNull(broker); + this.listener = Preconditions.checkNotNull(listener); + } + + private void checkNotFailed() { + Preconditions.checkState(state != State.FAILED, "Transaction chain has failed"); + } + + @Override + protected Object newTransactionIdentifier() { + return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement(); + } + + @Override + public CheckedFuture submit( + final DOMDataWriteTransaction transaction, final Collection cohorts) { + checkNotFailed(); + checkNotClosed(); + + final CheckedFuture ret = broker.submit(transaction, cohorts); + + COUNTER_UPDATER.incrementAndGet(this); + Futures.addCallback(ret, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + transactionCompleted(); + } + + @Override + public void onFailure(final Throwable t) { + transactionFailed(transaction, t); + } + }); + + return ret; + } + + @Override + public void close() { + final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING); + if (!success) { + LOG.debug("Chain {} is no longer running", this); + return; + } + + super.close(); + for (DOMStoreTransactionChain subChain : getTxFactories().values()) { + subChain.close(); + } + + if (counter == 0) { + finishClose(); + } + } + + private void finishClose() { + state = State.CLOSED; + listener.onTransactionChainSuccessful(this); + } + + private void transactionCompleted() { + if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) { + finishClose(); + } + } + + private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) { + state = State.FAILED; + LOG.debug("Transaction chain {} failed.", this, cause); + listener.onTransactionChainFailed(this, tx, cause); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerWriteOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerWriteOnlyTransaction.java new file mode 100644 index 0000000000..6d00210629 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerWriteOnlyTransaction.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DOMBrokerWriteOnlyTransaction + extends AbstractDOMBrokerTransaction implements DOMDataWriteTransaction { + + private static final AtomicReferenceFieldUpdater IMPL_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, AbstractDOMTransactionFactory.class, "commitImpl"); + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater FUTURE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, Future.class, "commitFuture"); + private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerWriteOnlyTransaction.class); + private static final Future CANCELLED_FUTURE = Futures.immediateCancelledFuture(); + + /** + * Implementation of real commit. It also acts as an indication that + * the transaction is running -- which we flip atomically using + * {@link #IMPL_UPDATER}. + */ + private volatile AbstractDOMTransactionFactory commitImpl; + + /** + * Future task of transaction commit. It starts off as null, but is + * set appropriately on {@link #submit()} and {@link #cancel()} via + * {@link AtomicReferenceFieldUpdater#lazySet(Object, Object)}. + * + * Lazy set is safe for use because it is only referenced to in the + * {@link #cancel()} slow path, where we will busy-wait for it. The + * fast path gets the benefit of a store-store barrier instead of the + * usual store-load barrier. + */ + private volatile Future commitFuture; + + protected DOMBrokerWriteOnlyTransaction(final Object identifier, + Map storeTxFactories, final AbstractDOMTransactionFactory commitImpl) { + super(identifier, storeTxFactories); + this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null."); + } + + @Override + protected T createTransaction(LogicalDatastoreType key) { + // FIXME : Casting shouldn't be necessary here + return (T) getTxFactory(key).newWriteOnlyTransaction(); + } + + @Override + public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) { + checkRunning(commitImpl); + getSubtransaction(store).write(path, data); + } + + @Override + public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + checkRunning(commitImpl); + getSubtransaction(store).delete(path); + } + + @Override + public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) { + checkRunning(commitImpl); + getSubtransaction(store).merge(path, data); + } + + @Override + public boolean cancel() { + final AbstractDOMTransactionFactory impl = IMPL_UPDATER.getAndSet(this, null); + if (impl != null) { + LOG.trace("Transaction {} cancelled before submit", getIdentifier()); + FUTURE_UPDATER.lazySet(this, CANCELLED_FUTURE); + closeSubtransactions(); + return true; + } + + // The transaction is in process of being submitted or cancelled. Busy-wait + // for the corresponding future. + Future future; + do { + future = commitFuture; + } while (future == null); + + return future.cancel(false); + } + + @Deprecated + @Override + public ListenableFuture> commit() { + return AbstractDataTransaction.convertToLegacyCommitFuture(submit()); + } + + @Override + public CheckedFuture submit() { + final AbstractDOMTransactionFactory impl = IMPL_UPDATER.getAndSet(this, null); + checkRunning(impl); + + final Collection txns = getSubtransactions(); + final Collection cohorts = new ArrayList<>(txns.size()); + + // FIXME: deal with errors thrown by backed (ready and submit can fail in theory) + for (DOMStoreWriteTransaction txn : txns) { + cohorts.add(txn.ready()); + } + + final CheckedFuture ret = impl.submit(this, cohorts); + FUTURE_UPDATER.lazySet(this, ret); + return ret; + } + + private void checkRunning(final AbstractDOMTransactionFactory impl) { + Preconditions.checkState(impl != null, "Transaction %s is no longer running", getIdentifier()); + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java index 538f2981da..06a43a6026 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java @@ -21,10 +21,10 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.broker.impl.AbstractDOMDataBroker; import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -34,13 +34,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3 + * ConcurrentDOMDataBroker commits transactions concurrently. The 3 * commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking * (ie async) per transaction but multiple transaction commits can run concurrent. * * @author Thomas Pantelis */ -public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { +public class ConcurrentDOMDataBroker extends AbstractDOMBroker { private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class); private static final String CAN_COMMIT = "CAN_COMMIT"; private static final String PRE_COMMIT = "PRE_COMMIT"; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java index 0b166f5ac8..80e25a167d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java @@ -9,11 +9,15 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; @@ -22,7 +26,10 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -36,11 +43,21 @@ import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; /** * Unit tests for DOMConcurrentDataCommitCoordinator. @@ -266,4 +283,229 @@ public class ConcurrentDOMDataBrokerTest { assertFailure(future, cause, mockCohort1, mockCohort2); } + + @Test + public void testCreateReadWriteTransaction(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + dataBroker.newReadWriteTransaction(); + + verify(domStore, never()).newReadWriteTransaction(); + } + + + @Test + public void testCreateWriteOnlyTransaction(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + dataBroker.newWriteOnlyTransaction(); + + verify(domStore, never()).newWriteOnlyTransaction(); + } + + @Test + public void testCreateReadOnlyTransaction(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + dataBroker.newReadOnlyTransaction(); + + verify(domStore, never()).newReadOnlyTransaction(); + } + + @Test + public void testLazySubTransactionCreationForReadWriteTransactions(){ + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class); + + doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction(); + doReturn(storeTxn).when(configDomStore).newReadWriteTransaction(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor); + DOMDataReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction(); + + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + + verify(configDomStore, never()).newReadWriteTransaction(); + verify(operationalDomStore, times(1)).newReadWriteTransaction(); + + dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + verify(configDomStore, times(1)).newReadWriteTransaction(); + verify(operationalDomStore, times(1)).newReadWriteTransaction(); + + } + + @Test + public void testLazySubTransactionCreationForWriteOnlyTransactions(){ + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class); + + doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction(); + doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor); + DOMDataWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction(); + + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + verify(configDomStore, never()).newWriteOnlyTransaction(); + verify(operationalDomStore, times(1)).newWriteOnlyTransaction(); + + dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + verify(configDomStore, times(1)).newWriteOnlyTransaction(); + verify(operationalDomStore, times(1)).newWriteOnlyTransaction(); + + } + + + @Test + public void testLazySubTransactionCreationForReadOnlyTransactions(){ + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class); + + doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction(); + doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor); + DOMDataReadOnlyTransaction dataTxn = dataBroker.newReadOnlyTransaction(); + + dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + + verify(configDomStore, never()).newReadOnlyTransaction(); + verify(operationalDomStore, times(1)).newReadOnlyTransaction(); + + dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build()); + + verify(configDomStore, times(1)).newReadOnlyTransaction(); + verify(operationalDomStore, times(1)).newReadOnlyTransaction(); + + } + + @Test + public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException { + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class); + + doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction(); + doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready(); + doReturn(Futures.immediateFuture(false)).when(mockCohort).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort).abort(); + + final CountDownLatch latch = new CountDownLatch(1); + final List commitCohorts = new ArrayList(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) { + @Override + public CheckedFuture submit(DOMDataWriteTransaction transaction, Collection cohorts) { + commitCohorts.addAll(cohorts); + latch.countDown(); + return super.submit(transaction, cohorts); + } + }; + DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction(); + + domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + + domDataReadWriteTransaction.submit(); + + latch.await(10, TimeUnit.SECONDS); + + assertTrue(commitCohorts.size() == 1); + } + + @Test + public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException { + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class); + DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class); + + doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction(); + doReturn(configTransaction).when(configDomStore).newReadWriteTransaction(); + + doReturn(mockCohortOperational).when(operationalTransaction).ready(); + doReturn(Futures.immediateFuture(false)).when(mockCohortOperational).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohortOperational).abort(); + + doReturn(mockCohortConfig).when(configTransaction).ready(); + doReturn(Futures.immediateFuture(false)).when(mockCohortConfig).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohortConfig).abort(); + + + final CountDownLatch latch = new CountDownLatch(1); + final List commitCohorts = new ArrayList(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) { + @Override + public CheckedFuture submit(DOMDataWriteTransaction transaction, Collection cohorts) { + commitCohorts.addAll(cohorts); + latch.countDown(); + return super.submit(transaction, cohorts); + } + }; + DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction(); + + domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + domDataReadWriteTransaction.submit(); + + latch.await(10, TimeUnit.SECONDS); + + assertTrue(commitCohorts.size() == 2); + } + + @Test + public void testCreateTransactionChain(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + + dataBroker.createTransactionChain(mock(TransactionChainListener.class)); + + verify(domStore, times(2)).createTransactionChain(); + + } + + @Test + public void testCreateTransactionOnChain(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + + DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class); + + doReturn(mockChain).when(domStore).createTransactionChain(); + doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction(); + + DOMTransactionChain transactionChain = dataBroker.createTransactionChain(mock(TransactionChainListener.class)); + + DOMDataWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction(); + + verify(mockChain, never()).newWriteOnlyTransaction(); + + domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + } + }