Merge "Create transaction on the backend datastore only when neccessary"
authorTony Tkacik <ttkacik@cisco.com>
Fri, 24 Apr 2015 10:09:48 +0000 (10:09 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 24 Apr 2015 10:09:48 +0000 (10:09 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBroker.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerWriteOnlyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBroker.java
new file mode 100644 (file)
index 0000000..833cb49
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractDOMBroker extends AbstractDOMTransactionFactory<DOMStore>
+        implements DOMDataBroker, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractDOMBroker.class);
+
+    private final AtomicLong txNum = new AtomicLong();
+    private final AtomicLong chainNum = new AtomicLong();
+    private final Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> extensions;
+    private volatile AutoCloseable closeable;
+
+    protected AbstractDOMBroker(final Map<LogicalDatastoreType, DOMStore> datastores) {
+        super(datastores);
+
+        boolean treeChange = true;
+        for (DOMStore ds : datastores.values()) {
+            if (!(ds instanceof DOMStoreTreeChangePublisher)) {
+                treeChange = false;
+                break;
+            }
+        }
+
+        if (treeChange) {
+            extensions = ImmutableMap.<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension>of(DOMDataTreeChangeService.class, new DOMDataTreeChangeService() {
+                @Override
+                public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(final DOMDataTreeIdentifier treeId, final L listener) {
+                    DOMStore publisher = getTxFactories().get(treeId.getDatastoreType());
+                    checkState(publisher != null, "Requested logical data store is not available.");
+
+                    return ((DOMStoreTreeChangePublisher) publisher).registerTreeChangeListener(treeId.getRootIdentifier(), listener);
+                }
+            });
+        } else {
+            extensions = Collections.emptyMap();
+        }
+    }
+
+    public void setCloseable(final AutoCloseable closeable) {
+        this.closeable = closeable;
+    }
+
+    @Override
+    public void close() {
+        super.close();
+
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                LOG.debug("Error closing instance", e);
+            }
+        }
+    }
+
+    @Override
+    protected Object newTransactionIdentifier() {
+        return "DOM-" + txNum.getAndIncrement();
+    }
+
+    @Override
+    public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
+                                                                                  final YangInstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
+
+        DOMStore potentialStore = getTxFactories().get(store);
+        checkState(potentialStore != null, "Requested logical data store is not available.");
+        return potentialStore.registerChangeListener(path, listener, triggeringScope);
+    }
+
+    @Override
+    public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+        return extensions;
+    }
+
+    @Override
+    public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+        checkNotClosed();
+
+        final Map<LogicalDatastoreType, DOMStoreTransactionChain> backingChains = new EnumMap<>(LogicalDatastoreType.class);
+        for (Map.Entry<LogicalDatastoreType, DOMStore> entry : getTxFactories().entrySet()) {
+            backingChains.put(entry.getKey(), entry.getValue().createTransactionChain());
+        }
+
+        final long chainId = chainNum.getAndIncrement();
+        LOG.debug("Transaction chain {} created with listener {}, backing store chains {}", chainId, listener,
+                backingChains);
+        return new DOMBrokerTransactionChain(chainId, backingChains, this, listener);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerTransaction.java
new file mode 100644 (file)
index 0000000..98fea88
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public abstract class AbstractDOMBrokerTransaction<K, T extends DOMStoreTransaction> implements
+        AsyncTransaction<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+
+    private Map<K, T> backingTxs;
+    private final Object identifier;
+    private final Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories;
+
+    /**
+     *
+     * Creates new composite Transactions.
+     *
+     * @param identifier
+     *            Identifier of transaction.
+     */
+    protected AbstractDOMBrokerTransaction(final Object identifier, Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories) {
+        this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
+        this.storeTxFactories = Preconditions.checkNotNull(storeTxFactories, "Store Transaction Factories should not be null");
+        this.backingTxs = new EnumMap(LogicalDatastoreType.class);
+    }
+
+    /**
+     * Returns subtransaction associated with supplied key.
+     *
+     * @param key
+     * @return
+     * @throws NullPointerException
+     *             if key is null
+     * @throws IllegalArgumentException
+     *             if no subtransaction is associated with key.
+     */
+    protected final T getSubtransaction(final K key) {
+        Preconditions.checkNotNull(key, "key must not be null.");
+
+        T ret = backingTxs.get(key);
+        if(ret == null){
+            ret = createTransaction(key);
+            backingTxs.put(key, ret);
+        }
+        Preconditions.checkArgument(ret != null, "No subtransaction associated with %s", key);
+        return ret;
+    }
+
+    protected abstract T createTransaction(final K key);
+
+    /**
+     * Returns immutable Iterable of all subtransactions.
+     *
+     */
+    protected Collection<T> getSubtransactions() {
+        return backingTxs.values();
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return identifier;
+    }
+
+    protected void closeSubtransactions() {
+        /*
+         * We share one exception for all failures, which are added
+         * as supressedExceptions to it.
+         */
+        IllegalStateException failure = null;
+        for (T subtransaction : backingTxs.values()) {
+            try {
+                subtransaction.close();
+            } catch (Exception e) {
+                // If we did not allocated failure we allocate it
+                if (failure == null) {
+                    failure = new IllegalStateException("Uncaught exception occured during closing transaction", e);
+                } else {
+                    // We update it with additional exceptions, which occurred during error.
+                    failure.addSuppressed(e);
+                }
+            }
+        }
+        // If we have failure, we throw it at after all attempts to close.
+        if (failure != null) {
+            throw failure;
+        }
+    }
+
+    protected DOMStoreTransactionFactory getTxFactory(K type){
+        return storeTxFactories.get(type);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMTransactionFactory.java
new file mode 100644 (file)
index 0000000..2187c6e
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+
+public abstract class AbstractDOMTransactionFactory<T extends DOMStoreTransactionFactory> implements AutoCloseable {
+    private static final AtomicIntegerFieldUpdater<AbstractDOMTransactionFactory> UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(AbstractDOMTransactionFactory.class, "closed");
+    private final Map<LogicalDatastoreType, T> storeTxFactories;
+    private volatile int closed = 0;
+
+    protected AbstractDOMTransactionFactory(final Map<LogicalDatastoreType, T> txFactories) {
+        this.storeTxFactories = new EnumMap<>(txFactories);
+    }
+
+    /**
+     * Implementations must return unique identifier for each and every call of
+     * this method;
+     *
+     * @return new Unique transaction identifier.
+     */
+    protected abstract Object newTransactionIdentifier();
+
+    /**
+     *
+     * @param transaction
+     * @param cohorts
+     * @return
+     */
+    protected abstract CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
+                                                                                   final Collection<DOMStoreThreePhaseCommitCohort> cohorts);
+
+    /**
+     *
+     * @return
+     */
+    public final DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+        checkNotClosed();
+
+        return new DOMBrokerReadOnlyTransaction(newTransactionIdentifier(), storeTxFactories);
+    }
+
+
+    /**
+     *
+     * @return
+     */
+    public final DOMDataWriteTransaction newWriteOnlyTransaction() {
+        checkNotClosed();
+
+        return new DOMBrokerWriteOnlyTransaction(newTransactionIdentifier(), storeTxFactories, this);
+    }
+
+
+    /**
+     *
+     * @return
+     */
+    public final DOMDataReadWriteTransaction newReadWriteTransaction() {
+        checkNotClosed();
+
+        return new DOMBrokerReadWriteTransaction<>(newTransactionIdentifier(), storeTxFactories, this);
+    }
+
+    /**
+     * Convenience accessor of backing factories intended to be used only by
+     * finalization of this class.
+     *
+     * <b>Note:</b>
+     * Finalization of this class may want to access other functionality of
+     * supplied Transaction factories.
+     *
+     * @return Map of backing transaction factories.
+     */
+    protected final Map<LogicalDatastoreType, T> getTxFactories() {
+        return storeTxFactories;
+    }
+
+    /**
+     * Checks if instance is not closed.
+     *
+     * @throws IllegalStateException If instance of this class was closed.
+     *
+     */
+    protected final void checkNotClosed() {
+        Preconditions.checkState(closed == 0, "Transaction factory was closed. No further operations allowed.");
+    }
+
+    @Override
+    public void close() {
+        final boolean success = UPDATER.compareAndSet(this, 0, 1);
+        Preconditions.checkState(success, "Transaction factory was already closed");
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadOnlyTransaction.java
new file mode 100644 (file)
index 0000000..656ced3
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class DOMBrokerReadOnlyTransaction<T extends DOMStoreReadTransaction>
+    extends AbstractDOMBrokerTransaction<LogicalDatastoreType, T>
+        implements DOMDataReadOnlyTransaction {
+    /**
+     * Creates new composite Transactions.
+     *
+     * @param identifier Identifier of transaction.
+     */
+    protected DOMBrokerReadOnlyTransaction(Object identifier, Map storeTxFactories) {
+        super(identifier, storeTxFactories);
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+            final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        return getSubtransaction(store).read(path);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(
+            final LogicalDatastoreType store,
+            final YangInstanceIdentifier path) {
+        return getSubtransaction(store).exists(path);
+    }
+
+    @Override
+    public void close() {
+        closeSubtransactions();
+    }
+
+    @Override
+    protected T createTransaction(LogicalDatastoreType key) {
+        return (T) getTxFactory(key).newReadOnlyTransaction();
+    }
+
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerReadWriteTransaction.java
new file mode 100644 (file)
index 0000000..efa7226
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class DOMBrokerReadWriteTransaction<T extends DOMStoreReadWriteTransaction>
+        extends DOMBrokerWriteOnlyTransaction<DOMStoreReadWriteTransaction> implements DOMDataReadWriteTransaction {
+    /**
+     * Creates new composite Transactions.
+     *
+     * @param identifier Identifier of transaction.
+     * @param storeTxFactories
+     */
+    protected DOMBrokerReadWriteTransaction(Object identifier, Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory>  storeTxFactories, final AbstractDOMTransactionFactory<?> commitImpl) {
+        super(identifier, storeTxFactories, commitImpl);
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+            final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        return getSubtransaction(store).read(path);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(
+            final LogicalDatastoreType store,
+            final YangInstanceIdentifier path) {
+        return getSubtransaction(store).exists(path);
+    }
+
+    @Override
+    protected DOMStoreReadWriteTransaction createTransaction(LogicalDatastoreType key) {
+        return getTxFactory(key).newReadWriteTransaction();
+    }
+
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerTransactionChain.java
new file mode 100644 (file)
index 0000000..9610647
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMBrokerTransactionChain extends AbstractDOMTransactionFactory<DOMStoreTransactionChain>
+        implements DOMTransactionChain {
+    private static enum State {
+        RUNNING,
+        CLOSING,
+        CLOSED,
+        FAILED,
+    }
+
+    private static final AtomicIntegerFieldUpdater<DOMBrokerTransactionChain> COUNTER_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, "counter");
+    private static final AtomicReferenceFieldUpdater<DOMBrokerTransactionChain, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, State.class, "state");
+    private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerTransactionChain.class);
+    private final AtomicLong txNum = new AtomicLong();
+    private final AbstractDOMBroker broker;
+    private final TransactionChainListener listener;
+    private final long chainId;
+
+    private volatile State state = State.RUNNING;
+    private volatile int counter = 0;
+
+    /**
+     *
+     * @param chainId
+     *            ID of transaction chain
+     * @param chains
+     *            Backing {@link DOMStoreTransactionChain}s.
+     * @param listener
+     *            Listener, which listens on transaction chain events.
+     * @throws NullPointerException
+     *             If any of arguments is null.
+     */
+    public DOMBrokerTransactionChain(final long chainId,
+                                     final Map<LogicalDatastoreType, DOMStoreTransactionChain> chains,
+                                     AbstractDOMBroker broker, final TransactionChainListener listener) {
+        super(chains);
+        this.chainId = chainId;
+        this.broker = Preconditions.checkNotNull(broker);
+        this.listener = Preconditions.checkNotNull(listener);
+    }
+
+    private void checkNotFailed() {
+        Preconditions.checkState(state != State.FAILED, "Transaction chain has failed");
+    }
+
+    @Override
+    protected Object newTransactionIdentifier() {
+        return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit(
+            final DOMDataWriteTransaction transaction, final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+        checkNotFailed();
+        checkNotClosed();
+
+        final CheckedFuture<Void, TransactionCommitFailedException> ret = broker.submit(transaction, cohorts);
+
+        COUNTER_UPDATER.incrementAndGet(this);
+        Futures.addCallback(ret, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                transactionCompleted();
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                transactionFailed(transaction, t);
+            }
+        });
+
+        return ret;
+    }
+
+    @Override
+    public void close() {
+        final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING);
+        if (!success) {
+            LOG.debug("Chain {} is no longer running", this);
+            return;
+        }
+
+        super.close();
+        for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
+            subChain.close();
+        }
+
+        if (counter == 0) {
+            finishClose();
+        }
+    }
+
+    private void finishClose() {
+        state = State.CLOSED;
+        listener.onTransactionChainSuccessful(this);
+    }
+
+    private void transactionCompleted() {
+        if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) {
+            finishClose();
+        }
+    }
+
+    private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+        state = State.FAILED;
+        LOG.debug("Transaction chain {}¬†failed.", this, cause);
+        listener.onTransactionChainFailed(this, tx, cause);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerWriteOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMBrokerWriteOnlyTransaction.java
new file mode 100644 (file)
index 0000000..6d00210
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DOMBrokerWriteOnlyTransaction<T extends DOMStoreWriteTransaction>
+        extends AbstractDOMBrokerTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
+
+    private static final AtomicReferenceFieldUpdater<DOMBrokerWriteOnlyTransaction, AbstractDOMTransactionFactory> IMPL_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, AbstractDOMTransactionFactory.class, "commitImpl");
+    @SuppressWarnings("rawtypes")
+    private static final AtomicReferenceFieldUpdater<DOMBrokerWriteOnlyTransaction, Future> FUTURE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, Future.class, "commitFuture");
+    private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerWriteOnlyTransaction.class);
+    private static final Future<?> CANCELLED_FUTURE = Futures.immediateCancelledFuture();
+
+    /**
+     * Implementation of real commit. It also acts as an indication that
+     * the transaction is running -- which we flip atomically using
+     * {@link #IMPL_UPDATER}.
+     */
+    private volatile AbstractDOMTransactionFactory<?> commitImpl;
+
+    /**
+     * Future task of transaction commit. It starts off as null, but is
+     * set appropriately on {@link #submit()} and {@link #cancel()} via
+     * {@link AtomicReferenceFieldUpdater#lazySet(Object, Object)}.
+     *
+     * Lazy set is safe for use because it is only referenced to in the
+     * {@link #cancel()} slow path, where we will busy-wait for it. The
+     * fast path gets the benefit of a store-store barrier instead of the
+     * usual store-load barrier.
+     */
+    private volatile Future<?> commitFuture;
+
+    protected DOMBrokerWriteOnlyTransaction(final Object identifier,
+                                            Map storeTxFactories, final AbstractDOMTransactionFactory<?> commitImpl) {
+        super(identifier, storeTxFactories);
+        this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null.");
+    }
+
+    @Override
+    protected T createTransaction(LogicalDatastoreType key) {
+        // FIXME : Casting shouldn't be necessary here
+        return (T) getTxFactory(key).newWriteOnlyTransaction();
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkRunning(commitImpl);
+        getSubtransaction(store).write(path, data);
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        checkRunning(commitImpl);
+        getSubtransaction(store).delete(path);
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkRunning(commitImpl);
+        getSubtransaction(store).merge(path, data);
+    }
+
+    @Override
+    public boolean cancel() {
+        final AbstractDOMTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
+        if (impl != null) {
+            LOG.trace("Transaction {} cancelled before submit", getIdentifier());
+            FUTURE_UPDATER.lazySet(this, CANCELLED_FUTURE);
+            closeSubtransactions();
+            return true;
+        }
+
+        // The transaction is in process of being submitted or cancelled. Busy-wait
+        // for the corresponding future.
+        Future<?> future;
+        do {
+            future = commitFuture;
+        } while (future == null);
+
+        return future.cancel(false);
+    }
+
+    @Deprecated
+    @Override
+    public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+        return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+        final AbstractDOMTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
+        checkRunning(impl);
+
+        final Collection<T> txns = getSubtransactions();
+        final Collection<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(txns.size());
+
+        // FIXME: deal with errors thrown by backed (ready and submit can fail in theory)
+        for (DOMStoreWriteTransaction txn : txns) {
+            cohorts.add(txn.ready());
+        }
+
+        final CheckedFuture<Void, TransactionCommitFailedException> ret = impl.submit(this, cohorts);
+        FUTURE_UPDATER.lazySet(this, ret);
+        return ret;
+    }
+
+    private void checkRunning(final AbstractDOMTransactionFactory<?> impl) {
+        Preconditions.checkState(impl != null, "Transaction %s is no longer running", getIdentifier());
+    }
+
+}
index 538f298..06a43a6 100644 (file)
@@ -21,10 +21,10 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.broker.impl.AbstractDOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -34,13 +34,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3
+ * ConcurrentDOMDataBroker commits transactions concurrently. The 3
  * commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
  * (ie async) per transaction but multiple transaction commits can run concurrent.
  *
  * @author Thomas Pantelis
  */
-public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
+public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
     private static final String CAN_COMMIT = "CAN_COMMIT";
     private static final String PRE_COMMIT = "PRE_COMMIT";
index 0b166f5..80e25a1 100644 (file)
@@ -9,11 +9,15 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
@@ -22,7 +26,10 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -36,11 +43,21 @@ import org.mockito.InOrder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 /**
  * Unit tests for DOMConcurrentDataCommitCoordinator.
@@ -266,4 +283,229 @@ public class ConcurrentDOMDataBrokerTest {
 
         assertFailure(future, cause, mockCohort1, mockCohort2);
     }
+
+    @Test
+    public void testCreateReadWriteTransaction(){
+        DOMStore domStore = mock(DOMStore.class);
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+        dataBroker.newReadWriteTransaction();
+
+        verify(domStore, never()).newReadWriteTransaction();
+    }
+
+
+    @Test
+    public void testCreateWriteOnlyTransaction(){
+        DOMStore domStore = mock(DOMStore.class);
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+        dataBroker.newWriteOnlyTransaction();
+
+        verify(domStore, never()).newWriteOnlyTransaction();
+    }
+
+    @Test
+    public void testCreateReadOnlyTransaction(){
+        DOMStore domStore = mock(DOMStore.class);
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+        dataBroker.newReadOnlyTransaction();
+
+        verify(domStore, never()).newReadOnlyTransaction();
+    }
+
+    @Test
+    public void testLazySubTransactionCreationForReadWriteTransactions(){
+        DOMStore configDomStore = mock(DOMStore.class);
+        DOMStore operationalDomStore = mock(DOMStore.class);
+        DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class);
+
+        doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction();
+        doReturn(storeTxn).when(configDomStore).newReadWriteTransaction();
+
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor);
+        DOMDataReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction();
+
+        dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+        dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+        dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+
+        verify(configDomStore, never()).newReadWriteTransaction();
+        verify(operationalDomStore, times(1)).newReadWriteTransaction();
+
+        dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+        verify(configDomStore, times(1)).newReadWriteTransaction();
+        verify(operationalDomStore, times(1)).newReadWriteTransaction();
+
+    }
+
+    @Test
+    public void testLazySubTransactionCreationForWriteOnlyTransactions(){
+        DOMStore configDomStore = mock(DOMStore.class);
+        DOMStore operationalDomStore = mock(DOMStore.class);
+        DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class);
+
+        doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction();
+        doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction();
+
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor);
+        DOMDataWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction();
+
+        dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+        dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+        verify(configDomStore, never()).newWriteOnlyTransaction();
+        verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
+
+        dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+        verify(configDomStore, times(1)).newWriteOnlyTransaction();
+        verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
+
+    }
+
+
+    @Test
+    public void testLazySubTransactionCreationForReadOnlyTransactions(){
+        DOMStore configDomStore = mock(DOMStore.class);
+        DOMStore operationalDomStore = mock(DOMStore.class);
+        DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class);
+
+        doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction();
+        doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction();
+
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor);
+        DOMDataReadOnlyTransaction dataTxn = dataBroker.newReadOnlyTransaction();
+
+        dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+        dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+
+        verify(configDomStore, never()).newReadOnlyTransaction();
+        verify(operationalDomStore, times(1)).newReadOnlyTransaction();
+
+        dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build());
+
+        verify(configDomStore, times(1)).newReadOnlyTransaction();
+        verify(operationalDomStore, times(1)).newReadOnlyTransaction();
+
+    }
+
+    @Test
+    public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException {
+        DOMStore configDomStore = mock(DOMStore.class);
+        DOMStore operationalDomStore = mock(DOMStore.class);
+        DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class);
+        DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
+
+        doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction();
+        doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready();
+        doReturn(Futures.immediateFuture(false)).when(mockCohort).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort).abort();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList();
+
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) {
+            @Override
+            public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction, Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+                commitCohorts.addAll(cohorts);
+                latch.countDown();
+                return super.submit(transaction, cohorts);
+            }
+        };
+        DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
+
+        domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+
+        domDataReadWriteTransaction.submit();
+
+        latch.await(10, TimeUnit.SECONDS);
+
+        assertTrue(commitCohorts.size() == 1);
+    }
+
+    @Test
+    public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException {
+        DOMStore configDomStore = mock(DOMStore.class);
+        DOMStore operationalDomStore = mock(DOMStore.class);
+        DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
+        DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class);
+        DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class);
+        DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class);
+
+        doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction();
+        doReturn(configTransaction).when(configDomStore).newReadWriteTransaction();
+
+        doReturn(mockCohortOperational).when(operationalTransaction).ready();
+        doReturn(Futures.immediateFuture(false)).when(mockCohortOperational).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohortOperational).abort();
+
+        doReturn(mockCohortConfig).when(configTransaction).ready();
+        doReturn(Futures.immediateFuture(false)).when(mockCohortConfig).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohortConfig).abort();
+
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList();
+
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) {
+            @Override
+            public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction, Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+                commitCohorts.addAll(cohorts);
+                latch.countDown();
+                return super.submit(transaction, cohorts);
+            }
+        };
+        DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
+
+        domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+        domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+        domDataReadWriteTransaction.submit();
+
+        latch.await(10, TimeUnit.SECONDS);
+
+        assertTrue(commitCohorts.size() == 2);
+    }
+
+    @Test
+    public void testCreateTransactionChain(){
+        DOMStore domStore = mock(DOMStore.class);
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+
+        dataBroker.createTransactionChain(mock(TransactionChainListener.class));
+
+        verify(domStore, times(2)).createTransactionChain();
+
+    }
+
+    @Test
+    public void testCreateTransactionOnChain(){
+        DOMStore domStore = mock(DOMStore.class);
+        ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+                domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+
+        DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
+        DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class);
+
+        doReturn(mockChain).when(domStore).createTransactionChain();
+        doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction();
+
+        DOMTransactionChain transactionChain = dataBroker.createTransactionChain(mock(TransactionChainListener.class));
+
+        DOMDataWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction();
+
+        verify(mockChain, never()).newWriteOnlyTransaction();
+
+        domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+    }
+
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.