From: Tony Tkacik Date: Fri, 24 Apr 2015 10:09:48 +0000 (+0000) Subject: Merge "Create transaction on the backend datastore only when neccessary" X-Git-Tag: release/lithium~224 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=2b2517144e4eb9c17d9b41e9d9ec20d0264f5e12;hp=5f553b1657b97adbeb59b8e346d6eed8148b281b Merge "Create transaction on the backend datastore only when neccessary" --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBroker.java new file mode 100644 index 0000000000..833cb49462 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBroker.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import static com.google.common.base.Preconditions.checkState; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension; +import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractDOMBroker extends AbstractDOMTransactionFactory + implements DOMDataBroker, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractDOMBroker.class); + + private final AtomicLong txNum = new AtomicLong(); + private final AtomicLong chainNum = new AtomicLong(); + private final Map, DOMDataBrokerExtension> extensions; + private volatile AutoCloseable closeable; + + protected AbstractDOMBroker(final Map datastores) { + super(datastores); + + boolean treeChange = true; + for (DOMStore ds : datastores.values()) { + if (!(ds instanceof DOMStoreTreeChangePublisher)) { + treeChange = false; + break; + } + } + + if (treeChange) { + extensions = ImmutableMap., DOMDataBrokerExtension>of(DOMDataTreeChangeService.class, new DOMDataTreeChangeService() { + @Override + public ListenerRegistration registerDataTreeChangeListener(final DOMDataTreeIdentifier treeId, final L listener) { + DOMStore publisher = getTxFactories().get(treeId.getDatastoreType()); + checkState(publisher != null, "Requested logical data store is not available."); + + return ((DOMStoreTreeChangePublisher) publisher).registerTreeChangeListener(treeId.getRootIdentifier(), listener); + } + }); + } else { + extensions = Collections.emptyMap(); + } + } + + public void setCloseable(final AutoCloseable closeable) { + this.closeable = closeable; + } + + @Override + public void close() { + super.close(); + + if (closeable != null) { + try { + closeable.close(); + } catch (Exception e) { + LOG.debug("Error closing instance", e); + } + } + } + + @Override + protected Object newTransactionIdentifier() { + return "DOM-" + txNum.getAndIncrement(); + } + + @Override + public ListenerRegistration registerDataChangeListener(final LogicalDatastoreType store, + final YangInstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) { + + DOMStore potentialStore = getTxFactories().get(store); + checkState(potentialStore != null, "Requested logical data store is not available."); + return potentialStore.registerChangeListener(path, listener, triggeringScope); + } + + @Override + public Map, DOMDataBrokerExtension> getSupportedExtensions() { + return extensions; + } + + @Override + public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) { + checkNotClosed(); + + final Map backingChains = new EnumMap<>(LogicalDatastoreType.class); + for (Map.Entry entry : getTxFactories().entrySet()) { + backingChains.put(entry.getKey(), entry.getValue().createTransactionChain()); + } + + final long chainId = chainNum.getAndIncrement(); + LOG.debug("Transaction chain {} created with listener {}, backing store chains {}", chainId, listener, + backingChains); + return new DOMBrokerTransactionChain(chainId, backingChains, this, listener); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java new file mode 100644 index 0000000000..98fea88f63 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.EnumMap; +import java.util.Map; +import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public abstract class AbstractDOMBrokerTransaction implements + AsyncTransaction> { + + private Map backingTxs; + private final Object identifier; + private final Map storeTxFactories; + + /** + * + * Creates new composite Transactions. + * + * @param identifier + * Identifier of transaction. + */ + protected AbstractDOMBrokerTransaction(final Object identifier, Map storeTxFactories) { + this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null"); + this.storeTxFactories = Preconditions.checkNotNull(storeTxFactories, "Store Transaction Factories should not be null"); + this.backingTxs = new EnumMap(LogicalDatastoreType.class); + } + + /** + * Returns subtransaction associated with supplied key. + * + * @param key + * @return + * @throws NullPointerException + * if key is null + * @throws IllegalArgumentException + * if no subtransaction is associated with key. + */ + protected final T getSubtransaction(final K key) { + Preconditions.checkNotNull(key, "key must not be null."); + + T ret = backingTxs.get(key); + if(ret == null){ + ret = createTransaction(key); + backingTxs.put(key, ret); + } + Preconditions.checkArgument(ret != null, "No subtransaction associated with %s", key); + return ret; + } + + protected abstract T createTransaction(final K key); + + /** + * Returns immutable Iterable of all subtransactions. + * + */ + protected Collection getSubtransactions() { + return backingTxs.values(); + } + + @Override + public Object getIdentifier() { + return identifier; + } + + protected void closeSubtransactions() { + /* + * We share one exception for all failures, which are added + * as supressedExceptions to it. + */ + IllegalStateException failure = null; + for (T subtransaction : backingTxs.values()) { + try { + subtransaction.close(); + } catch (Exception e) { + // If we did not allocated failure we allocate it + if (failure == null) { + failure = new IllegalStateException("Uncaught exception occured during closing transaction", e); + } else { + // We update it with additional exceptions, which occurred during error. + failure.addSuppressed(e); + } + } + } + // If we have failure, we throw it at after all attempts to close. + if (failure != null) { + throw failure; + } + } + + protected DOMStoreTransactionFactory getTxFactory(K type){ + return storeTxFactories.get(type); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java new file mode 100644 index 0000000000..2187c6e0f9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.Collection; +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; + +public abstract class AbstractDOMTransactionFactory implements AutoCloseable { + private static final AtomicIntegerFieldUpdater UPDATER = + AtomicIntegerFieldUpdater.newUpdater(AbstractDOMTransactionFactory.class, "closed"); + private final Map storeTxFactories; + private volatile int closed = 0; + + protected AbstractDOMTransactionFactory(final Map txFactories) { + this.storeTxFactories = new EnumMap<>(txFactories); + } + + /** + * Implementations must return unique identifier for each and every call of + * this method; + * + * @return new Unique transaction identifier. + */ + protected abstract Object newTransactionIdentifier(); + + /** + * + * @param transaction + * @param cohorts + * @return + */ + protected abstract CheckedFuture submit(final DOMDataWriteTransaction transaction, + final Collection cohorts); + + /** + * + * @return + */ + public final DOMDataReadOnlyTransaction newReadOnlyTransaction() { + checkNotClosed(); + + return new DOMBrokerReadOnlyTransaction(newTransactionIdentifier(), storeTxFactories); + } + + + /** + * + * @return + */ + public final DOMDataWriteTransaction newWriteOnlyTransaction() { + checkNotClosed(); + + return new DOMBrokerWriteOnlyTransaction(newTransactionIdentifier(), storeTxFactories, this); + } + + + /** + * + * @return + */ + public final DOMDataReadWriteTransaction newReadWriteTransaction() { + checkNotClosed(); + + return new DOMBrokerReadWriteTransaction<>(newTransactionIdentifier(), storeTxFactories, this); + } + + /** + * Convenience accessor of backing factories intended to be used only by + * finalization of this class. + * + * Note: + * Finalization of this class may want to access other functionality of + * supplied Transaction factories. + * + * @return Map of backing transaction factories. + */ + protected final Map getTxFactories() { + return storeTxFactories; + } + + /** + * Checks if instance is not closed. + * + * @throws IllegalStateException If instance of this class was closed. + * + */ + protected final void checkNotClosed() { + Preconditions.checkState(closed == 0, "Transaction factory was closed. No further operations allowed."); + } + + @Override + public void close() { + final boolean success = UPDATER.compareAndSet(this, 0, 1); + Preconditions.checkState(success, "Transaction factory was already closed"); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java new file mode 100644 index 0000000000..656ced37c1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.Map; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class DOMBrokerReadOnlyTransaction + extends AbstractDOMBrokerTransaction + implements DOMDataReadOnlyTransaction { + /** + * Creates new composite Transactions. + * + * @param identifier Identifier of transaction. + */ + protected DOMBrokerReadOnlyTransaction(Object identifier, Map storeTxFactories) { + super(identifier, storeTxFactories); + } + + @Override + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { + return getSubtransaction(store).read(path); + } + + @Override + public CheckedFuture exists( + final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + return getSubtransaction(store).exists(path); + } + + @Override + public void close() { + closeSubtransactions(); + } + + @Override + protected T createTransaction(LogicalDatastoreType key) { + return (T) getTxFactory(key).newReadOnlyTransaction(); + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadWriteTransaction.java new file mode 100644 index 0000000000..efa7226219 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadWriteTransaction.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.Map; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class DOMBrokerReadWriteTransaction + extends DOMBrokerWriteOnlyTransaction implements DOMDataReadWriteTransaction { + /** + * Creates new composite Transactions. + * + * @param identifier Identifier of transaction. + * @param storeTxFactories + */ + protected DOMBrokerReadWriteTransaction(Object identifier, Map storeTxFactories, final AbstractDOMTransactionFactory commitImpl) { + super(identifier, storeTxFactories, commitImpl); + } + + @Override + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { + return getSubtransaction(store).read(path); + } + + @Override + public CheckedFuture exists( + final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + return getSubtransaction(store).exists(path); + } + + @Override + protected DOMStoreReadWriteTransaction createTransaction(LogicalDatastoreType key) { + return getTxFactory(key).newReadWriteTransaction(); + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java new file mode 100644 index 0000000000..9610647fff --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DOMBrokerTransactionChain extends AbstractDOMTransactionFactory + implements DOMTransactionChain { + private static enum State { + RUNNING, + CLOSING, + CLOSED, + FAILED, + } + + private static final AtomicIntegerFieldUpdater COUNTER_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, "counter"); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, State.class, "state"); + private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerTransactionChain.class); + private final AtomicLong txNum = new AtomicLong(); + private final AbstractDOMBroker broker; + private final TransactionChainListener listener; + private final long chainId; + + private volatile State state = State.RUNNING; + private volatile int counter = 0; + + /** + * + * @param chainId + * ID of transaction chain + * @param chains + * Backing {@link DOMStoreTransactionChain}s. + * @param listener + * Listener, which listens on transaction chain events. + * @throws NullPointerException + * If any of arguments is null. + */ + public DOMBrokerTransactionChain(final long chainId, + final Map chains, + AbstractDOMBroker broker, final TransactionChainListener listener) { + super(chains); + this.chainId = chainId; + this.broker = Preconditions.checkNotNull(broker); + this.listener = Preconditions.checkNotNull(listener); + } + + private void checkNotFailed() { + Preconditions.checkState(state != State.FAILED, "Transaction chain has failed"); + } + + @Override + protected Object newTransactionIdentifier() { + return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement(); + } + + @Override + public CheckedFuture submit( + final DOMDataWriteTransaction transaction, final Collection cohorts) { + checkNotFailed(); + checkNotClosed(); + + final CheckedFuture ret = broker.submit(transaction, cohorts); + + COUNTER_UPDATER.incrementAndGet(this); + Futures.addCallback(ret, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + transactionCompleted(); + } + + @Override + public void onFailure(final Throwable t) { + transactionFailed(transaction, t); + } + }); + + return ret; + } + + @Override + public void close() { + final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING); + if (!success) { + LOG.debug("Chain {} is no longer running", this); + return; + } + + super.close(); + for (DOMStoreTransactionChain subChain : getTxFactories().values()) { + subChain.close(); + } + + if (counter == 0) { + finishClose(); + } + } + + private void finishClose() { + state = State.CLOSED; + listener.onTransactionChainSuccessful(this); + } + + private void transactionCompleted() { + if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) { + finishClose(); + } + } + + private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) { + state = State.FAILED; + LOG.debug("Transaction chain {} failed.", this, cause); + listener.onTransactionChainFailed(this, tx, cause); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerWriteOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerWriteOnlyTransaction.java new file mode 100644 index 0000000000..6d00210629 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerWriteOnlyTransaction.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DOMBrokerWriteOnlyTransaction + extends AbstractDOMBrokerTransaction implements DOMDataWriteTransaction { + + private static final AtomicReferenceFieldUpdater IMPL_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, AbstractDOMTransactionFactory.class, "commitImpl"); + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater FUTURE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, Future.class, "commitFuture"); + private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerWriteOnlyTransaction.class); + private static final Future CANCELLED_FUTURE = Futures.immediateCancelledFuture(); + + /** + * Implementation of real commit. It also acts as an indication that + * the transaction is running -- which we flip atomically using + * {@link #IMPL_UPDATER}. + */ + private volatile AbstractDOMTransactionFactory commitImpl; + + /** + * Future task of transaction commit. It starts off as null, but is + * set appropriately on {@link #submit()} and {@link #cancel()} via + * {@link AtomicReferenceFieldUpdater#lazySet(Object, Object)}. + * + * Lazy set is safe for use because it is only referenced to in the + * {@link #cancel()} slow path, where we will busy-wait for it. The + * fast path gets the benefit of a store-store barrier instead of the + * usual store-load barrier. + */ + private volatile Future commitFuture; + + protected DOMBrokerWriteOnlyTransaction(final Object identifier, + Map storeTxFactories, final AbstractDOMTransactionFactory commitImpl) { + super(identifier, storeTxFactories); + this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null."); + } + + @Override + protected T createTransaction(LogicalDatastoreType key) { + // FIXME : Casting shouldn't be necessary here + return (T) getTxFactory(key).newWriteOnlyTransaction(); + } + + @Override + public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) { + checkRunning(commitImpl); + getSubtransaction(store).write(path, data); + } + + @Override + public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + checkRunning(commitImpl); + getSubtransaction(store).delete(path); + } + + @Override + public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) { + checkRunning(commitImpl); + getSubtransaction(store).merge(path, data); + } + + @Override + public boolean cancel() { + final AbstractDOMTransactionFactory impl = IMPL_UPDATER.getAndSet(this, null); + if (impl != null) { + LOG.trace("Transaction {} cancelled before submit", getIdentifier()); + FUTURE_UPDATER.lazySet(this, CANCELLED_FUTURE); + closeSubtransactions(); + return true; + } + + // The transaction is in process of being submitted or cancelled. Busy-wait + // for the corresponding future. + Future future; + do { + future = commitFuture; + } while (future == null); + + return future.cancel(false); + } + + @Deprecated + @Override + public ListenableFuture> commit() { + return AbstractDataTransaction.convertToLegacyCommitFuture(submit()); + } + + @Override + public CheckedFuture submit() { + final AbstractDOMTransactionFactory impl = IMPL_UPDATER.getAndSet(this, null); + checkRunning(impl); + + final Collection txns = getSubtransactions(); + final Collection cohorts = new ArrayList<>(txns.size()); + + // FIXME: deal with errors thrown by backed (ready and submit can fail in theory) + for (DOMStoreWriteTransaction txn : txns) { + cohorts.add(txn.ready()); + } + + final CheckedFuture ret = impl.submit(this, cohorts); + FUTURE_UPDATER.lazySet(this, ret); + return ret; + } + + private void checkRunning(final AbstractDOMTransactionFactory impl) { + Preconditions.checkState(impl != null, "Transaction %s is no longer running", getIdentifier()); + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java index 538f2981da..06a43a6026 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java @@ -21,10 +21,10 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.broker.impl.AbstractDOMDataBroker; import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -34,13 +34,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3 + * ConcurrentDOMDataBroker commits transactions concurrently. The 3 * commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking * (ie async) per transaction but multiple transaction commits can run concurrent. * * @author Thomas Pantelis */ -public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { +public class ConcurrentDOMDataBroker extends AbstractDOMBroker { private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class); private static final String CAN_COMMIT = "CAN_COMMIT"; private static final String PRE_COMMIT = "PRE_COMMIT"; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java index 0b166f5ac8..80e25a167d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java @@ -9,11 +9,15 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; @@ -22,7 +26,10 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -36,11 +43,21 @@ import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; /** * Unit tests for DOMConcurrentDataCommitCoordinator. @@ -266,4 +283,229 @@ public class ConcurrentDOMDataBrokerTest { assertFailure(future, cause, mockCohort1, mockCohort2); } + + @Test + public void testCreateReadWriteTransaction(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + dataBroker.newReadWriteTransaction(); + + verify(domStore, never()).newReadWriteTransaction(); + } + + + @Test + public void testCreateWriteOnlyTransaction(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + dataBroker.newWriteOnlyTransaction(); + + verify(domStore, never()).newWriteOnlyTransaction(); + } + + @Test + public void testCreateReadOnlyTransaction(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + dataBroker.newReadOnlyTransaction(); + + verify(domStore, never()).newReadOnlyTransaction(); + } + + @Test + public void testLazySubTransactionCreationForReadWriteTransactions(){ + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class); + + doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction(); + doReturn(storeTxn).when(configDomStore).newReadWriteTransaction(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor); + DOMDataReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction(); + + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + + verify(configDomStore, never()).newReadWriteTransaction(); + verify(operationalDomStore, times(1)).newReadWriteTransaction(); + + dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + verify(configDomStore, times(1)).newReadWriteTransaction(); + verify(operationalDomStore, times(1)).newReadWriteTransaction(); + + } + + @Test + public void testLazySubTransactionCreationForWriteOnlyTransactions(){ + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class); + + doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction(); + doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor); + DOMDataWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction(); + + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + verify(configDomStore, never()).newWriteOnlyTransaction(); + verify(operationalDomStore, times(1)).newWriteOnlyTransaction(); + + dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + verify(configDomStore, times(1)).newWriteOnlyTransaction(); + verify(operationalDomStore, times(1)).newWriteOnlyTransaction(); + + } + + + @Test + public void testLazySubTransactionCreationForReadOnlyTransactions(){ + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class); + + doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction(); + doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor); + DOMDataReadOnlyTransaction dataTxn = dataBroker.newReadOnlyTransaction(); + + dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + + verify(configDomStore, never()).newReadOnlyTransaction(); + verify(operationalDomStore, times(1)).newReadOnlyTransaction(); + + dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build()); + + verify(configDomStore, times(1)).newReadOnlyTransaction(); + verify(operationalDomStore, times(1)).newReadOnlyTransaction(); + + } + + @Test + public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException { + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class); + + doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction(); + doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready(); + doReturn(Futures.immediateFuture(false)).when(mockCohort).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort).abort(); + + final CountDownLatch latch = new CountDownLatch(1); + final List commitCohorts = new ArrayList(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) { + @Override + public CheckedFuture submit(DOMDataWriteTransaction transaction, Collection cohorts) { + commitCohorts.addAll(cohorts); + latch.countDown(); + return super.submit(transaction, cohorts); + } + }; + DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction(); + + domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + + domDataReadWriteTransaction.submit(); + + latch.await(10, TimeUnit.SECONDS); + + assertTrue(commitCohorts.size() == 1); + } + + @Test + public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException { + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class); + DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class); + + doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction(); + doReturn(configTransaction).when(configDomStore).newReadWriteTransaction(); + + doReturn(mockCohortOperational).when(operationalTransaction).ready(); + doReturn(Futures.immediateFuture(false)).when(mockCohortOperational).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohortOperational).abort(); + + doReturn(mockCohortConfig).when(configTransaction).ready(); + doReturn(Futures.immediateFuture(false)).when(mockCohortConfig).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohortConfig).abort(); + + + final CountDownLatch latch = new CountDownLatch(1); + final List commitCohorts = new ArrayList(); + + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) { + @Override + public CheckedFuture submit(DOMDataWriteTransaction transaction, Collection cohorts) { + commitCohorts.addAll(cohorts); + latch.countDown(); + return super.submit(transaction, cohorts); + } + }; + DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction(); + + domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + domDataReadWriteTransaction.submit(); + + latch.await(10, TimeUnit.SECONDS); + + assertTrue(commitCohorts.size() == 2); + } + + @Test + public void testCreateTransactionChain(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + + dataBroker.createTransactionChain(mock(TransactionChainListener.class)); + + verify(domStore, times(2)).createTransactionChain(); + + } + + @Test + public void testCreateTransactionOnChain(){ + DOMStore domStore = mock(DOMStore.class); + ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, + domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor); + + DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class); + + doReturn(mockChain).when(domStore).createTransactionChain(); + doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction(); + + DOMTransactionChain transactionChain = dataBroker.createTransactionChain(mock(TransactionChainListener.class)); + + DOMDataWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction(); + + verify(mockChain, never()).newWriteOnlyTransaction(); + + domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + } + }