Bug 1073: Added support to DOMBrokerImpl for Transaction Chaining 95/7395/14
authorTony Tkacik <ttkacik@cisco.com>
Mon, 26 May 2014 10:04:45 +0000 (12:04 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Thu, 12 Jun 2014 13:20:42 +0000 (15:20 +0200)
Splitted functionality of DOMBrokerImpl to separate classes
which allows easier code reuse with transaction chaining:

New internal APIs:
DOMDataCommitExecutor - Commit executor which invokes
   three-phase commit coordination
DOMDataCommitErrorListener - Error listener for commit executor
and one execution

AbstractDOMForwardedTransactionFactory - Factory which creates
   composite transactions on top of DOMStore transactions
DOMDataCommitCoordinatorImpl - Commit Executor implementation
DOMBrokerTransactionChainImpl - Implementation of DOMTransactionChain.

Added 2 JUnit tests for Transaction Chains:
  - Test positive scenario (chain of write, read, delete transactions)
  - Test IllegalStateException when previous transaction
    was not commited.

Change-Id: Ic2290d7fb3d4ea52a44bea02b493c1e537e929a6
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
18 files changed:
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataBroker.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedCompositeTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedTransactionFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitImplementation.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/DOMDataBrokerProxy.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/BlockingTransactionChainListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java [new file with mode: 0644]

index dbaba29..c120508 100644 (file)
@@ -1,6 +1,5 @@
 /*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
@@ -8,17 +7,45 @@
 package org.opendaylight.controller.md.sal.dom.api;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainFactory;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.sal.core.api.BrokerService;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public interface DOMDataBroker extends AsyncDataBroker<InstanceIdentifier, NormalizedNode<?, ?>, DOMDataChangeListener>, BrokerService {
+/**
+ * Data Broker which provides data transaction and data change listener fuctionality
+ * using {@link NormalizedNode} data format.
+ *
+ * This interface is type capture of generic interfaces and returns type captures
+ * of results for client-code convenience.
+ *
+ */
+public interface DOMDataBroker extends
+        AsyncDataBroker<InstanceIdentifier, NormalizedNode<?, ?>, DOMDataChangeListener>,
+        TransactionChainFactory<InstanceIdentifier, NormalizedNode<?, ?>>, BrokerService {
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
     DOMDataReadTransaction newReadOnlyTransaction();
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     DOMDataReadWriteTransaction newReadWriteTransaction();
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     DOMDataWriteTransaction newWriteOnlyTransaction();
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    DOMTransactionChain createTransactionChain(TransactionChainListener listener);
 }
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMTransactionChain.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMTransactionChain.java
new file mode 100644 (file)
index 0000000..b894911
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * A chain of DOM Data transactions.
+ *
+ * Transactions in a chain need to be committed in sequence and each
+ * transaction should see the effects of previous transactions as if they happened. A chain
+ * makes no guarantees of atomicity, in fact transactions are committed as soon as possible.
+ *
+ * <p>
+ * This interface is type capture of {@link TransactionChain} for DOM Data Contracts.
+ */
+public interface DOMTransactionChain extends TransactionChain<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+    @Override
+    DOMDataReadTransaction newReadOnlyTransaction();
+
+    @Override
+    DOMDataReadWriteTransaction newReadWriteTransaction();
+
+    @Override
+    DOMDataWriteTransaction newWriteOnlyTransaction();
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedCompositeTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedCompositeTransaction.java
new file mode 100644 (file)
index 0000000..0c07b06
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Composite DOM Transaction backed by {@link DOMStoreTransaction}.
+ *
+ * Abstract base for composite transaction, which provides access only to common
+ * functionality as retrieval of subtransaction, close method and retrieval of
+ * identifier.
+ *
+ * @param <K>
+ *            Subtransaction distinguisher
+ * @param <T>
+ *            Subtransaction type
+ */
+abstract class AbstractDOMForwardedCompositeTransaction<K, T extends DOMStoreTransaction> implements
+        AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+    private final ImmutableMap<K, T> backingTxs;
+    private final Object identifier;
+
+    /**
+     *
+     * Creates new composite Transactions.
+     *
+     * @param identifier
+     *            Identifier of transaction.
+     * @param backingTxs
+     *            Key,value map of backing transactions.
+     */
+    protected AbstractDOMForwardedCompositeTransaction(final Object identifier, final ImmutableMap<K, T> backingTxs) {
+        this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
+        this.backingTxs = Preconditions.checkNotNull(backingTxs, "Backing transactions should not be null");
+    }
+
+    /**
+     * Returns subtransaction associated with supplied key.
+     *
+     * @param key
+     * @return
+     * @throws NullPointerException
+     *             if key is null
+     * @throws IllegalArgumentException
+     *             if no subtransaction is associated with key.
+     */
+    protected final T getSubtransaction(final K key) {
+        Preconditions.checkNotNull(key, "key must not be null.");
+        Preconditions.checkArgument(backingTxs.containsKey(key), "No subtransaction associated with %s", key);
+        return backingTxs.get(key);
+    }
+
+    /**
+     * Returns immutable Iterable of all subtransactions.
+     *
+     */
+    protected Iterable<T> getSubtransactions() {
+        return backingTxs.values();
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public void close() {
+        /*
+         *  We share one exception for all failures, which are added
+         *  as supressedExceptions to it.
+         *
+         */
+        IllegalStateException failure = null;
+        for (T subtransaction : backingTxs.values()) {
+            try {
+                subtransaction.close();
+            } catch (Exception e) {
+                // If we did not allocated failure we allocate it
+                if(failure == null) {
+                    failure = new IllegalStateException("Uncaught exception occured during closing transaction.", e);
+                } else {
+                    // We update it with addotional exceptions, which occured during error.
+                    failure.addSuppressed(e);
+                }
+            }
+        }
+        // If we have failure, we throw it at after all attempts to close.
+        if(failure != null) {
+            throw failure;
+        }
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedTransactionFactory.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedTransactionFactory.java
new file mode 100644 (file)
index 0000000..7b5ea11
--- /dev/null
@@ -0,0 +1,225 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ *
+ * Abstract composite transaction factory.
+ *
+ * Provides an convenience common implementation for composite DOM Transactions,
+ * where subtransaction is identified by {@link LogicalDatastoreType} type and
+ * implementation of subtransaction is provided by
+ * {@link DOMStoreTransactionFactory}.
+ *
+ * <b>Note:</b>This class does not have thread-safe implementation of  {@link #close()},
+ *   implementation may allow accessing and allocating new transactions during closing
+ *   this instance.
+ *
+ * @param <T>
+ *            Type of {@link DOMStoreTransactionFactory} factory.
+ */
+public abstract class AbstractDOMForwardedTransactionFactory<T extends DOMStoreTransactionFactory> implements DOMDataCommitImplementation, AutoCloseable {
+
+    private final ImmutableMap<LogicalDatastoreType, T> storeTxFactories;
+
+    private boolean closed;
+
+    protected AbstractDOMForwardedTransactionFactory(final Map<LogicalDatastoreType, ? extends T> txFactories) {
+        this.storeTxFactories = ImmutableMap.copyOf(txFactories);
+    }
+
+    /**
+     * Implementations must return unique identifier for each and every call of
+     * this method;
+     *
+     * @return new Unique transaction identifier.
+     */
+    protected abstract Object newTransactionIdentifier();
+
+    /**
+     * Creates a new composite read-only transaction
+     *
+     * Creates a new composite read-only transaction backed by one transaction
+     * per factory in {@link #getTxFactories()}.
+     *
+     * Subtransaction for reading is selected by supplied
+     * {@link LogicalDatastoreType} as parameter for
+     * {@link DOMDataReadTransaction#read(LogicalDatastoreType, InstanceIdentifier)}
+     * .
+     *
+     * Id of returned transaction is retrieved via
+     * {@link #newTransactionIdentifier()}.
+     *
+     * @return New composite read-only transaction.
+     */
+    public DOMDataReadTransaction newReadOnlyTransaction() {
+        checkNotClosed();
+        ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadTransaction> builder = ImmutableMap.builder();
+        for (Entry<LogicalDatastoreType, T> store : storeTxFactories.entrySet()) {
+            builder.put(store.getKey(), store.getValue().newReadOnlyTransaction());
+        }
+        return new DOMForwardedReadOnlyTransaction(newTransactionIdentifier(), builder.build());
+    }
+
+
+
+    /**
+     * Creates a new composite write-only transaction
+     *
+     * <p>
+     * Creates a new composite write-only transaction backed by one write-only
+     * transaction per factory in {@link #getTxFactories()}.
+     *
+     * <p>
+     * Implementation of composite Write-only transaction is following:
+     *
+     * <ul>
+     * <li>
+     * {@link DOMDataWriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+     * - backing subtransaction is selected by {@link LogicalDatastoreType},
+     * {@link DOMStoreWriteTransaction#write(InstanceIdentifier, NormalizedNode)}
+     * is invoked on selected subtransaction.
+     * <li>
+     * {@link DOMDataWriteTransaction#merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+     * - backing subtransaction is selected by {@link LogicalDatastoreType},
+     * {@link DOMStoreWriteTransaction#merge(InstanceIdentifier, NormalizedNode)}
+     * is invoked on selected subtransaction.
+     * <li>
+     * {@link DOMDataWriteTransaction#delete(LogicalDatastoreType, InstanceIdentifier)
+     * - backing subtransaction is selected by {@link LogicalDatastoreType},
+     * {@link DOMStoreWriteTransaction#delete(InstanceIdentifier)} is invoked on
+     * selected subtransaction.
+     * <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
+     * {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
+     * and then invoking finalized implementation callback
+     * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+     * was commited and gathered results.
+     * </ul>
+     *
+     * Id of returned transaction is generated via
+     * {@link #newTransactionIdentifier()}.
+     *
+     * @return New composite write-only transaction associated with this
+     *         factory.
+     */
+    public DOMDataWriteTransaction newWriteOnlyTransaction() {
+        checkNotClosed();
+        ImmutableMap.Builder<LogicalDatastoreType, DOMStoreWriteTransaction> builder = ImmutableMap.builder();
+        for (Entry<LogicalDatastoreType, T> store : storeTxFactories.entrySet()) {
+            builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction());
+        }
+        return new DOMForwardedWriteTransaction<DOMStoreWriteTransaction>(newTransactionIdentifier(), builder.build(),
+                this);
+    }
+
+    /**
+     * Creates a new composite write-only transaction
+     *
+     * <p>
+     * Creates a new composite write-only transaction backed by one write-only
+     * transaction per factory in {@link #getTxFactories()}.
+     * <p>
+     * Implementation of composite Write-only transaction is following:
+     *
+     * <ul>
+     * <li>
+     * {@link DOMDataWriteTransaction#read(LogicalDatastoreType, InstanceIdentifier)}
+     * - backing subtransaction is selected by {@link LogicalDatastoreType},
+     * {@link DOMStoreWriteTransaction#read(InstanceIdentifier)} is invoked on
+     * selected subtransaction.
+     * <li>
+     * {@link DOMDataWriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+     * - backing subtransaction is selected by {@link LogicalDatastoreType},
+     * {@link DOMStoreWriteTransaction#write(InstanceIdentifier, NormalizedNode)}
+     * is invoked on selected subtransaction.
+     * <li>
+     * {@link DOMDataWriteTransaction#merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+     * - backing subtransaction is selected by {@link LogicalDatastoreType},
+     * {@link DOMStoreWriteTransaction#merge(InstanceIdentifier, NormalizedNode)}
+     * is invoked on selected subtransaction.
+     * <li>
+     * {@link DOMDataWriteTransaction#delete(LogicalDatastoreType, InstanceIdentifier)
+     * - backing subtransaction is selected by {@link LogicalDatastoreType},
+     * {@link DOMStoreWriteTransaction#delete(InstanceIdentifier)} is invoked on
+     * selected subtransaction.
+     * <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
+     * {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
+     * and then invoking finalized implementation callback
+     * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+     * was commited and gathered results.
+     * <li>
+     * </ul>
+     *
+     * Id of returned transaction is generated via
+     * {@link #newTransactionIdentifier()}.
+     *
+     * @return New composite read-write transaction associated with this
+     *         factory.
+     *
+     */
+    public DOMDataReadWriteTransaction newReadWriteTransaction() {
+        checkNotClosed();
+        ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadWriteTransaction> builder = ImmutableMap.builder();
+        for (Entry<LogicalDatastoreType, T> store : storeTxFactories.entrySet()) {
+            builder.put(store.getKey(), store.getValue().newReadWriteTransaction());
+        }
+        return new DOMForwardedReadWriteTransaction(newTransactionIdentifier(), builder.build(), this);
+    }
+
+    /**
+     * Convenience accessor of backing factories intended to be used only by
+     * finalization of this class.
+     *
+     * <b>Note:</b>
+     * Finalization of this class may want to access other functionality of
+     * supplied Transaction factories.
+     *
+     * @return Map of backing transaction factories.
+     */
+    protected final Map<LogicalDatastoreType, T> getTxFactories() {
+        return storeTxFactories;
+    }
+
+    /**
+     *
+     * Checks if instance is not closed.
+     *
+     * @throws IllegalStateException If instance of this class was closed.
+     *
+     */
+    @GuardedBy("this")
+    protected synchronized void checkNotClosed() {
+        Preconditions.checkState(!closed,"Transaction factory was closed. No further operations allowed.");
+    }
+
+    @Override
+    @GuardedBy("this")
+    public synchronized void close() {
+        closed = true;
+    }
+
+}
index 608ac9b..7e37a1e 100644 (file)
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
-public class DOMDataBrokerImpl implements DOMDataBroker, AutoCloseable {
+public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DOMStore> implements DOMDataBroker,
+        AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
-    private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class);
-    private final ImmutableMap<LogicalDatastoreType, DOMStore> datastores;
-    private final ListeningExecutorService executor;
+
+    private final DOMDataCommitCoordinatorImpl coordinator;
     private final AtomicLong txNum = new AtomicLong();
+    private final AtomicLong chainNum = new AtomicLong();
 
     public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
             final ListeningExecutorService executor) {
-        super();
-        this.datastores = datastores;
-        this.executor = executor;
+        super(datastores);
+        this.coordinator = new DOMDataCommitCoordinatorImpl(executor);
     }
 
-    private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
-
-        @Override
-        public Boolean apply(final Iterable<Boolean> input) {
-
-            for (Boolean value : input) {
-                if (value == false) {
-                    return Boolean.FALSE;
-                }
-            }
-            return Boolean.TRUE;
-        }
-    };
-
     @Override
-    public DOMDataReadTransaction newReadOnlyTransaction() {
-        ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadTransaction> builder = ImmutableMap.builder();
-        for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
-            builder.put(store.getKey(), store.getValue().newReadOnlyTransaction());
-        }
-        return new ReadOnlyTransactionImpl(newTransactionIdentifier(), builder.build());
-    }
-
-    private Object newTransactionIdentifier() {
+    protected Object newTransactionIdentifier() {
         return "DOM-" + txNum.getAndIncrement();
     }
 
-    @Override
-    public DOMDataReadWriteTransaction newReadWriteTransaction() {
-        ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadWriteTransaction> builder = ImmutableMap.builder();
-        for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
-            builder.put(store.getKey(), store.getValue().newReadWriteTransaction());
-        }
-        return new ReadWriteTransactionImpl(newTransactionIdentifier(), builder.build(), this);
-    }
-
-    @Override
-    public DOMDataWriteTransaction newWriteOnlyTransaction() {
-        ImmutableMap.Builder<LogicalDatastoreType, DOMStoreWriteTransaction> builder = ImmutableMap.builder();
-        for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
-            builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction());
-        }
-        return new WriteTransactionImpl<DOMStoreWriteTransaction>(newTransactionIdentifier(), builder.build(), this);
-    }
-
     @Override
     public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
             final InstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
 
-        DOMStore potentialStore = datastores.get(store);
+        DOMStore potentialStore = getTxFactories().get(store);
         checkState(potentialStore != null, "Requested logical data store is not available.");
         return potentialStore.registerChangeListener(path, listener, triggeringScope);
     }
 
-    private ListenableFuture<RpcResult<TransactionStatus>> submit(
-            final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
-        LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
-        return executor.submit(new CommitCoordination(transaction));
-    }
-
-    private abstract static class AbstractCompositeTransaction<K, T extends DOMStoreTransaction> implements
-            AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
-
-        private final ImmutableMap<K, T> backingTxs;
-        private final Object identifier;
-
-        protected AbstractCompositeTransaction(final Object identifier, final ImmutableMap<K, T> backingTxs) {
-            this.identifier = checkNotNull(identifier, "Identifier should not be null");
-            this.backingTxs = checkNotNull(backingTxs, "Backing transactions should not be null");
-        }
-
-        protected T getSubtransaction(final K key) {
-            return backingTxs.get(key);
-        }
-
-        public Iterable<T> getSubtransactions() {
-            return backingTxs.values();
-        }
-
-        @Override
-        public Object getIdentifier() {
-            return identifier;
-        }
-
-        @Override
-        public void close() {
-            try {
-                for (T subtransaction : backingTxs.values()) {
-                    subtransaction.close();
-                }
-            } catch (Exception e) {
-                throw new IllegalStateException("Uncaught exception occured during closing transaction.", e);
-            }
-        }
-
-    }
-
-    private static class ReadOnlyTransactionImpl extends
-            AbstractCompositeTransaction<LogicalDatastoreType, DOMStoreReadTransaction> implements
-            DOMDataReadTransaction {
-
-        protected ReadOnlyTransactionImpl(final Object identifier,
-                final ImmutableMap<LogicalDatastoreType, DOMStoreReadTransaction> backingTxs) {
-            super(identifier, backingTxs);
-        }
-
-        @Override
-        public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
-                final InstanceIdentifier path) {
-            return getSubtransaction(store).read(path);
-        }
-
-    }
-
-    private static class WriteTransactionImpl<T extends DOMStoreWriteTransaction> extends
-            AbstractCompositeTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
-
-        private final DOMDataBrokerImpl broker;
-        private ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts;
-
-        protected WriteTransactionImpl(final Object identifier, final ImmutableMap<LogicalDatastoreType, T> backingTxs,
-                final DOMDataBrokerImpl broker) {
-            super(identifier, backingTxs);
-            this.broker = broker;
-        }
-
-        public synchronized Iterable<DOMStoreThreePhaseCommitCohort> ready() {
-            checkState(cohorts == null, "Transaction was already marked as ready.");
-            ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
-            for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
-                cohortsBuilder.add(subTx.ready());
-            }
-            cohorts = cohortsBuilder.build();
-            return cohorts;
-        }
-
-        protected ImmutableList<DOMStoreThreePhaseCommitCohort> getCohorts() {
-            return cohorts;
-        }
-
-        @Override
-        public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
-            getSubtransaction(store).write(path, data);
-        }
-
-        @Override
-        public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
-            getSubtransaction(store).delete(path);
-        }
-
-        @Override
-        public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
-                final NormalizedNode<?, ?> data) {
-            getSubtransaction(store).merge(path,data);
-        }
-
-        @Override
-        public void cancel() {
-            // TODO Auto-generated method stub
-
-        }
-
-        @Override
-        public ListenableFuture<RpcResult<TransactionStatus>> commit() {
-
-            ready();
-            return broker.submit(this);
-        }
-
-    }
-
-    private static class ReadWriteTransactionImpl extends WriteTransactionImpl<DOMStoreReadWriteTransaction> implements
-            DOMDataReadWriteTransaction {
-
-        protected ReadWriteTransactionImpl(final Object identifier,
-                final ImmutableMap<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
-                final DOMDataBrokerImpl broker) {
-            // super(identifier, backingTxs);
-            super(identifier, backingTxs, broker);
-        }
-
-        @Override
-        public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
-                final InstanceIdentifier path) {
-            return getSubtransaction(store).read(path);
-        }
-    }
-
-    private final class CommitCoordination implements Callable<RpcResult<TransactionStatus>> {
-
-        private final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction;
-
-        public CommitCoordination(final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
-            this.transaction = transaction;
-        }
-
-        @Override
-        public RpcResult<TransactionStatus> call() throws Exception {
-
-            try {
-                Boolean canCommit = canCommit().get();
-
-                if (canCommit) {
-                    try {
-                        preCommit().get();
-                        try {
-                            commit().get();
-                            COORDINATOR_LOG.debug("Tx: {} Is commited.", transaction.getIdentifier());
-                            return Rpcs.getRpcResult(true, TransactionStatus.COMMITED,
-                                    Collections.<RpcError> emptySet());
-
-                        } catch (InterruptedException | ExecutionException e) {
-                            COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e);
-                        }
-
-                    } catch (InterruptedException | ExecutionException e) {
-                        COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort",
-                                transaction.getIdentifier(), e);
-                    }
-                } else {
-                    COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.", transaction.getIdentifier());
-                    abort().get();
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e);
-
-            }
-            try {
-                abort().get();
-            } catch (InterruptedException | ExecutionException e) {
-                COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e);
-            }
-            return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError> emptySet());
-        }
-
-        public ListenableFuture<Void> preCommit() {
-            COORDINATOR_LOG.debug("Transaction {}: PreCommit Started ", transaction.getIdentifier());
-            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
-            for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
-                ops.add(cohort.preCommit());
-            }
-            return (ListenableFuture) Futures.allAsList(ops.build());
-        }
-
-        public ListenableFuture<Void> commit() {
-            COORDINATOR_LOG.debug("Transaction {}: Commit Started ", transaction.getIdentifier());
-            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
-            for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
-                ops.add(cohort.commit());
-            }
-            return (ListenableFuture) Futures.allAsList(ops.build());
-        }
-
-        public ListenableFuture<Boolean> canCommit() {
-            COORDINATOR_LOG.debug("Transaction {}: CanCommit Started ", transaction.getIdentifier());
-            Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
-            for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
-                canCommitOperations.add(cohort.canCommit());
-            }
-            ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
-            return Futures.transform(allCanCommits, AND_FUNCTION);
-        }
-
-        public ListenableFuture<Void> abort() {
-            COORDINATOR_LOG.debug("Transaction {}: Abort Started ", transaction.getIdentifier());
-            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
-            for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
-                ops.add(cohort.abort());
-            }
-            return (ListenableFuture) Futures.allAsList(ops.build());
-        };
+    @Override
+    public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+        ImmutableMap.Builder<LogicalDatastoreType, DOMStoreTransactionChain> backingChainsBuilder = ImmutableMap
+                .builder();
+        for (Entry<LogicalDatastoreType, DOMStore> entry : getTxFactories().entrySet()) {
+            backingChainsBuilder.put(entry.getKey(), entry.getValue().createTransactionChain());
+        }
+        long chainId = chainNum.getAndIncrement();
+        ImmutableMap<LogicalDatastoreType, DOMStoreTransactionChain> backingChains = backingChainsBuilder.build();
+        LOG.debug("Transactoin chain {} created with listener {}, backing store chains {}", chainId, listener,
+                backingChains);
+        return new DOMDataBrokerTransactionChainImpl(chainId, backingChains, coordinator, listener);
 
     }
 
     @Override
-    public void close() throws Exception {
-
+    public ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+        LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
+        return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
     }
 
 }
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java
new file mode 100644 (file)
index 0000000..bcefc25
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * NormalizedNode implementation of {@link TransactionChain} which is backed
+ * by several {@link DOMStoreTransactionChain} differentiated by provided
+ * {@link LogicalDatastoreType} type.
+ *
+ */
+public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
+        implements DOMTransactionChain, DOMDataCommitErrorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class);
+    private final DOMDataCommitExecutor coordinator;
+    private final TransactionChainListener listener;
+    private final long chainId;
+    private final AtomicLong txNum = new AtomicLong();
+    @GuardedBy("this")
+    private boolean failed = false;
+
+    /**
+     *
+     * @param chainId
+     *            ID of transaction chain
+     * @param chains
+     *            Backing {@link DOMStoreTransactionChain}s.
+     * @param coordinator
+     *            Commit Coordinator which should be used to coordinate commits
+     *            of transaction
+     *            produced by this chain.
+     * @param listener
+     *            Listener, which listens on transaction chain events.
+     * @throws NullPointerException
+     *             If any of arguments is null.
+     */
+    public DOMDataBrokerTransactionChainImpl(final long chainId,
+            final ImmutableMap<LogicalDatastoreType, DOMStoreTransactionChain> chains,
+            final DOMDataCommitExecutor coordinator, final TransactionChainListener listener) {
+        super(chains);
+        this.chainId = chainId;
+        this.coordinator = Preconditions.checkNotNull(coordinator);
+        this.listener = Preconditions.checkNotNull(listener);
+    }
+
+    @Override
+    protected Object newTransactionIdentifier() {
+        return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
+    }
+
+    @Override
+    public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit(
+            final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+        return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
+    }
+
+    @Override
+    public synchronized void close() {
+        super.close();
+        for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
+            subChain.close();
+        }
+
+        if (!failed) {
+            LOG.debug("Transaction chain {}¬†successfully finished.", this);
+            listener.onTransactionChainSuccessful(this);
+        }
+    }
+
+    @Override
+    public synchronized void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+        failed = true;
+        LOG.debug("Transaction chain {}¬†failed.", this, cause);
+        listener.onTransactionChainFailed(this, tx, cause);
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
new file mode 100644 (file)
index 0000000..540e2fe
--- /dev/null
@@ -0,0 +1,409 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+/**
+ *
+ * Implementation of blocking three phase commit coordinator, which which
+ * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
+ *
+ * This implementation does not support cancelation of commit,
+ *
+ * In order to advance to next phase of three phase commit all subtasks of
+ * previous step must be finish.
+ *
+ * This executor does not have an upper bound on subtask timeout.
+ *
+ *
+ */
+public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
+
+    /**
+     * Runs AND binary operation between all booleans in supplied iteration of booleans.
+     *
+     * This method will stop evaluating iterables if first found is false.
+     */
+    private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
+
+        @Override
+        public Boolean apply(final Iterable<Boolean> input) {
+            for(boolean value : input) {
+               if(!value) {
+                   return Boolean.FALSE;
+               }
+            }
+            return Boolean.TRUE;
+        }
+    };
+
+    private final ListeningExecutorService executor;
+
+    /**
+     *
+     * Construct DOMDataCommitCoordinator which uses supplied executor to
+     * process commit coordinations.
+     *
+     * @param executor
+     */
+    public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
+        this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
+        Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
+        Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
+        Preconditions.checkArgument(listener != null, "Listener must not be null");
+        LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
+        ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
+                transaction, cohorts, listener));
+        if (listener.isPresent()) {
+            Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
+        }
+        return commitFuture;
+    }
+
+    /**
+     *
+     * Phase of 3PC commit
+     *
+     * Represents phase of 3PC Commit
+     *
+     *
+     */
+    private static enum CommitPhase {
+        /**
+         *
+         * Commit Coordination Task is submitted for executing
+         *
+         */
+        SUBMITTED,
+        /**
+         * Commit Coordination Task is in can commit phase of 3PC
+         *
+         */
+        CAN_COMMIT,
+        /**
+         * Commit Coordination Task is in pre-commit phase of 3PC
+         *
+         */
+        PRE_COMMIT,
+        /**
+         * Commit Coordination Task is in commit phase of 3PC
+         *
+         */
+        COMMIT,
+        /**
+         * Commit Coordination Task is in abort phase of 3PC
+         *
+         */
+        ABORT
+    }
+
+    /**
+     *
+     * Implementation of blocking three-phase commit-coordination tasks without
+     * support of cancelation.
+     *
+     */
+    private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
+
+        private final DOMDataWriteTransaction tx;
+        private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
+
+        @GuardedBy("this")
+        private CommitPhase currentPhase;
+
+        public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
+                final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
+                final Optional<DOMDataCommitErrorListener> listener) {
+            this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
+            this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
+            this.currentPhase = CommitPhase.SUBMITTED;
+        }
+
+        @Override
+        public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
+
+            try {
+                canCommitBlocking();
+                preCommitBlocking();
+                return commitBlocking();
+            } catch (TransactionCommitFailedException e) {
+                LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
+                abortBlocking(e);
+                throw e;
+            }
+        }
+
+        /**
+         *
+         * Invokes canCommit on underlying cohorts and blocks till
+         * all results are returned.
+         *
+         * Valid state transition is from SUBMITTED to CAN_COMMIT,
+         * if currentPhase is not SUBMITTED throws IllegalStateException.
+         *
+         * @throws TransactionCommitFailedException
+         *             If one of cohorts failed can Commit
+         *
+         */
+        private void canCommitBlocking() throws TransactionCommitFailedException {
+            final Boolean canCommitResult = canCommitAll().checkedGet();
+            if (!canCommitResult) {
+                throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+            }
+        }
+
+        /**
+         *
+         * Invokes preCommit on underlying cohorts and blocks till
+         * all results are returned.
+         *
+         * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+         * state is not CAN_COMMIT
+         * throws IllegalStateException.
+         *
+         * @throws TransactionCommitFailedException
+         *             If one of cohorts failed preCommit
+         *
+         */
+        private void preCommitBlocking() throws TransactionCommitFailedException {
+            preCommitAll().checkedGet();
+        }
+
+        /**
+         *
+         * Invokes commit on underlying cohorts and blocks till
+         * all results are returned.
+         *
+         * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+         * IllegalStateException.
+         *
+         * @throws TransactionCommitFailedException
+         *             If one of cohorts failed preCommit
+         *
+         */
+        private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
+            commitAll().checkedGet();
+            return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.<RpcError> emptySet());
+        }
+
+        /**
+         * Aborts transaction.
+         *
+         * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
+         * cohorts, blocks
+         * for all results. If any of the abort failed throws
+         * IllegalStateException,
+         * which will contains originalCause as suppressed Exception.
+         *
+         * If aborts we're successful throws supplied exception
+         *
+         * @param originalCause
+         *            Exception which should be used to fail transaction for
+         *            consumers of transaction
+         *            future and listeners of transaction failure.
+         * @throws TransactionCommitFailedException
+         *             on invocation of this method.
+         *             originalCa
+         * @throws IllegalStateException
+         *             if abort failed.
+         */
+        private void abortBlocking(final TransactionCommitFailedException originalCause)
+                throws TransactionCommitFailedException {
+            LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
+            Exception cause = originalCause;
+            try {
+                abortAsyncAll().get();
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
+                cause = new IllegalStateException("Abort failed.", e);
+                cause.addSuppressed(e);
+            }
+            Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
+        }
+
+        /**
+         *
+         * Invokes preCommit on underlying cohorts and returns future
+         * which will complete once all preCommit on cohorts completed or
+         * failed.
+         *
+         *
+         * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+         * state is not CAN_COMMIT
+         * throws IllegalStateException.
+         *
+         * @return Future which will complete once all cohorts completed
+         *         preCommit.
+         *         Future throws TransactionCommitFailedException
+         *         If any of cohorts failed preCommit
+         *
+         */
+        private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
+            changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
+            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+                ops.add(cohort.preCommit());
+            }
+            /*
+             * We are returing all futures as list, not only succeeded ones in
+             * order to fail composite future if any of them failed.
+             * See Futures.allAsList for this description.
+             */
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
+            return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
+        }
+
+        /**
+         *
+         * Invokes commit on underlying cohorts and returns future which
+         * completes
+         * once all commits on cohorts are completed.
+         *
+         * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+         * IllegalStateException
+         *
+         * @return Future which will complete once all cohorts completed
+         *         commit.
+         *         Future throws TransactionCommitFailedException
+         *         If any of cohorts failed preCommit
+         *
+         */
+        private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
+            changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
+            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+                ops.add(cohort.commit());
+            }
+            /*
+             * We are returing all futures as list, not only succeeded ones in
+             * order to fail composite future if any of them failed.
+             * See Futures.allAsList for this description.
+             */
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
+            return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+        }
+
+        /**
+         *
+         * Invokes canCommit on underlying cohorts and returns composite future
+         * which will contains {@link Boolean#TRUE} only and only if
+         * all cohorts returned true.
+         *
+         * Valid state transition is from SUBMITTED to CAN_COMMIT,
+         * if currentPhase is not SUBMITTED throws IllegalStateException.
+         *
+         * @return Future which will complete once all cohorts completed
+         *         preCommit.
+         *         Future throws TransactionCommitFailedException
+         *         If any of cohorts failed preCommit
+         *
+         */
+        private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
+            changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
+            Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
+            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+                canCommitOperations.add(cohort.canCommit());
+            }
+            ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
+            ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
+            return Futures
+                    .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
+
+        }
+
+        /**
+         *
+         * Invokes abort on underlying cohorts and returns future which
+         * completes
+         * once all abort on cohorts are completed.
+         *
+         * @return Future which will complete once all cohorts completed
+         *         abort.
+         *
+         */
+        private ListenableFuture<Void> abortAsyncAll() {
+            changeStateFrom(currentPhase, CommitPhase.ABORT);
+            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+                ops.add(cohort.abort());
+            }
+            /*
+             * We are returing all futures as list, not only succeeded ones in
+             * order to fail composite future if any of them failed.
+             * See Futures.allAsList for this description.
+             */
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
+            return compositeResult;
+        }
+
+        /**
+         * Change phase / state of transaction from expected value to new value
+         *
+         * This method checks state and updates state to new state of
+         * of this task if current state equals expected state.
+         * If expected state and current state are different raises
+         * IllegalStateException
+         * which means there is probably bug in implementation of commit
+         * coordination.
+         *
+         * If transition is successful, it logs transition on DEBUG level.
+         *
+         * @param currentExpected
+         *            Required phase for change of state
+         * @param newState
+         *            New Phase which will be entered by transaction.
+         * @throws IllegalStateException
+         *             If currentState of task does not match expected state
+         */
+        private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
+            Preconditions.checkState(currentPhase.equals(currentExpected),
+                    "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
+                    currentPhase, newState);
+            LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
+            currentPhase = newState;
+        };
+
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java
new file mode 100644 (file)
index 0000000..811d4d8
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+
+/**
+ *
+ * Utility implemetation of {@link FutureCallback} which is responsible
+ * for invoking {@link DOMDataCommitErrorListener} on TransactionCommit failed.
+ *
+ * When {@link #onFailure(Throwable)} is invoked, supplied {@link DOMDataCommitErrorListener}
+ * callback is invoked with associated transaction and throwable is invoked on listener.
+ *
+ */
+class DOMDataCommitErrorInvoker implements FutureCallback<RpcResult<TransactionStatus>> {
+
+    private final DOMDataWriteTransaction tx;
+    private final DOMDataCommitErrorListener listener;
+
+
+    /**
+     *
+     * Construct new DOMDataCommitErrorInvoker.
+     *
+     * @param transaction Transaction which should be passed as argument to {@link DOMDataCommitErrorListener#onCommitFailed(DOMDataWriteTransaction, Throwable)}
+     * @param listener Listener which should be invoked on error.
+     */
+    public DOMDataCommitErrorInvoker(DOMDataWriteTransaction transaction, DOMDataCommitErrorListener listener) {
+        this.tx = Preconditions.checkNotNull(transaction, "Transaction must not be null");
+        this.listener = Preconditions.checkNotNull(listener, "Listener must not be null");
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+        listener.onCommitFailed(tx, t);
+    }
+
+    @Override
+    public void onSuccess(RpcResult<TransactionStatus> result) {
+        // NOOP
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java
new file mode 100644 (file)
index 0000000..3a4b54e
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.EventListener;
+
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+
+/**
+ *
+ * Listener on transaction failure which may be passed to
+ * {@link DOMDataCommitExecutor}. This listener is notified during transaction
+ * processing, before result is delivered to other client code outside MD-SAL.
+ * This allows implementors to update their internal state before transaction
+ * failure is visible to client code.
+ *
+ * This is internal API for MD-SAL implementations, for consumer facing error
+ * listeners see {@link TransactionChainListener}.
+ *
+ */
+interface DOMDataCommitErrorListener extends EventListener {
+
+    /**
+     *
+     * Callback which is invoked on transaction failure during three phase
+     * commit in {@link DOMDataCommitExecutor}.
+     *
+     *
+     * Implementation of this callback MUST NOT do any blocking calls or any
+     * calls to MD-SAL, since this callback is invoked synchronously on MD-SAL
+     * Broker coordination thread.
+     *
+     * @param tx
+     *            Transaction which failed
+     * @param cause
+     *            Failure reason
+     */
+    void onCommitFailed(DOMDataWriteTransaction tx, Throwable cause);
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java
new file mode 100644 (file)
index 0000000..f233912
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Executor of Three Phase Commit coordination for
+ * {@link DOMDataWriteTransaction} transactions.
+ * 
+ * Implementations are responsible for executing implementation of three-phase
+ * commit protocol on supplied {@link DOMStoreThreePhaseCommitCohort}s.
+ * 
+ * 
+ */
+interface DOMDataCommitExecutor {
+
+    /**
+     * Submits supplied transaction to be executed in context of provided
+     * cohorts.
+     * 
+     * Transaction is used only as a context, cohorts should be associated with
+     * this transaction.
+     * 
+     * @param tx
+     *            Transaction to be used as context for reporting
+     * @param cohort
+     *            DOM Store cohorts representing provided transaction, its
+     *            subtransactoins.
+     * @param listener
+     *            Error listener which should be notified if transaction failed.
+     * @return ListenableFuture which contains RpcResult with
+     *         {@link TransactionStatus#COMMITED} if commit coordination on
+     *         cohorts finished successfully.
+     * 
+     */
+    ListenableFuture<RpcResult<TransactionStatus>> submit(DOMDataWriteTransaction tx,
+            Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitImplementation.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitImplementation.java
new file mode 100644 (file)
index 0000000..ca2d711
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * 
+ * Implementation prototype of commit method for
+ * {@link DOMForwardedWriteTransaction}.
+ * 
+ */
+public interface DOMDataCommitImplementation {
+
+    /**
+     * User-supplied implementation of {@link DOMDataWriteTransaction#commit()}
+     * for transaction.
+     * 
+     * Callback invoked when {@link DOMDataWriteTransaction#commit()} is invoked
+     * on transaction created by this factory.
+     * 
+     * @param transaction
+     *            Transaction on which {@link DOMDataWriteTransaction#commit()}
+     *            was invoked.
+     * @param cohorts
+     *            Iteration of cohorts for subtransactions associated with
+     *            commited transaction.
+     * 
+     */
+    ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts);
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java
new file mode 100644 (file)
index 0000000..be55911
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * 
+ * Read Only Transaction, which is composed of several
+ * {@link DOMStoreReadTransaction} transactions. Subtransaction is selected by
+ * {@link LogicalDatastoreType} type parameter in
+ * {@link #read(LogicalDatastoreType, InstanceIdentifier)}.
+ */
+class DOMForwardedReadOnlyTransaction extends
+        AbstractDOMForwardedCompositeTransaction<LogicalDatastoreType, DOMStoreReadTransaction> implements
+        DOMDataReadTransaction {
+
+    protected DOMForwardedReadOnlyTransaction(final Object identifier,
+            final ImmutableMap<LogicalDatastoreType, DOMStoreReadTransaction> backingTxs) {
+        super(identifier, backingTxs);
+    }
+
+    @Override
+    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
+            final InstanceIdentifier path) {
+        return getSubtransaction(store).read(path);
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java
new file mode 100644 (file)
index 0000000..956e169
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * 
+ * Read-Write Transaction, which is composed of several
+ * {@link DOMStoreReadTransaction} transactions. Subtransaction is selected by
+ * {@link LogicalDatastoreType} type parameter in:
+ * 
+ * <ul>
+ * <li>{@link #read(LogicalDatastoreType, InstanceIdentifier)}
+ * <li>{@link #put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * <li>{@link #delete(LogicalDatastoreType, InstanceIdentifier)}
+ * <li>{@link #merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * </ul>
+ * {@link #commit()} will result in invocation of
+ * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * invocation with all {@link DOMStoreThreePhaseCommitCohort} for underlying
+ * transactions.
+ * 
+ */
+class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMStoreReadWriteTransaction> implements
+        DOMDataReadWriteTransaction {
+
+    protected DOMForwardedReadWriteTransaction(final Object identifier,
+            final ImmutableMap<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
+            final DOMDataCommitImplementation commitImpl) {
+        super(identifier, backingTxs, commitImpl);
+    }
+
+    @Override
+    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
+            final InstanceIdentifier path) {
+        return getSubtransaction(store).read(path);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedWriteTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedWriteTransaction.java
new file mode 100644 (file)
index 0000000..199438f
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * /**
+ * 
+ * Read-Write Transaction, which is composed of several
+ * {@link DOMStoreReadTransaction} transactions. Subtransaction is selected by
+ * {@link LogicalDatastoreType} type parameter in:
+ * 
+ * <ul>
+ * <li>{@link #put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * <li>{@link #delete(LogicalDatastoreType, InstanceIdentifier)}
+ * <li>{@link #merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * </ul>
+ * <p>
+ * {@link #commit()} will result in invocation of
+ * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * invocation with all {@link DOMStoreThreePhaseCommitCohort} for underlying
+ * transactions.
+ * 
+ * @param <T>
+ *            Subtype of {@link DOMStoreWriteTransaction} which is used as
+ *            subtransaction.
+ */
+class DOMForwardedWriteTransaction<T extends DOMStoreWriteTransaction> extends
+        AbstractDOMForwardedCompositeTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
+
+    @GuardedBy("this")
+    private DOMDataCommitImplementation commitImpl;
+
+    @GuardedBy("this")
+    private boolean canceled;
+    @GuardedBy("this")
+    private ListenableFuture<RpcResult<TransactionStatus>> commitFuture;
+
+    protected DOMForwardedWriteTransaction(final Object identifier,
+            final ImmutableMap<LogicalDatastoreType, T> backingTxs, final DOMDataCommitImplementation commitImpl) {
+        super(identifier, backingTxs);
+        this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null.");
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkNotReady();
+        getSubtransaction(store).write(path, data);
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
+        checkNotReady();
+        getSubtransaction(store).delete(path);
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkNotReady();
+        getSubtransaction(store).merge(path, data);
+    }
+
+    @Override
+    public synchronized void cancel() {
+        checkState(!canceled, "Transaction was canceled.");
+        if (commitFuture != null) {
+            // FIXME: Implement cancelation of commit future
+            // when Broker impl will support cancelation.
+            throw new UnsupportedOperationException("Not implemented yet.");
+        }
+        canceled = true;
+        commitImpl = null;
+
+    }
+
+    @Override
+    public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit() {
+        checkNotReady();
+
+        ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
+        for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
+            cohortsBuilder.add(subTx.ready());
+        }
+        ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts = cohortsBuilder.build();
+        commitFuture = commitImpl.commit(this, cohorts);
+        return commitFuture;
+    }
+
+    private void checkNotReady() {
+        checkNotCanceled();
+        checkNotCommited();
+    }
+
+    private void checkNotCanceled() {
+        Preconditions.checkState(!canceled, "Transaction was canceled.");
+    }
+
+    private void checkNotCommited() {
+        checkState(commitFuture == null, "Transaction was already commited.");
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java
new file mode 100644 (file)
index 0000000..87bd6c8
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+
+/**
+ *
+ * Utility exception mapper which translates {@link Exception}
+ * to {@link TransactionCommitFailedException}.
+ *
+ * This mapper is intended to be used with {@link Futures#makeChecked(com.google.common.util.concurrent.ListenableFuture, Function)}
+ * <ul>
+ * <li>if exception is {@link TransactionCommitFailedException} or one of its subclasses returns original exception.
+ * <li>if exception is {@link ExecutionException} and cause is  {@link TransactionCommitFailedException} return cause
+ * <li>otherwise returns {@link TransactionCommitFailedException} with original exception as a cause.
+ * </ul>
+ *
+ */
+final class TransactionCommitFailedExceptionMapper implements
+        Function<Exception, TransactionCommitFailedException> {
+
+    static final TransactionCommitFailedExceptionMapper PRE_COMMIT_MAPPER = create("canCommit");
+
+    static final TransactionCommitFailedExceptionMapper CAN_COMMIT_ERROR_MAPPER = create("preCommit");
+
+    static final TransactionCommitFailedExceptionMapper COMMIT_ERROR_MAPPER = create("commit");
+
+    private final String opName;
+
+    private TransactionCommitFailedExceptionMapper(final String opName) {
+        this.opName = Preconditions.checkNotNull(opName);
+    }
+
+    public static final TransactionCommitFailedExceptionMapper create(final String opName) {
+        return new TransactionCommitFailedExceptionMapper(opName);
+    }
+
+    @Override
+    public TransactionCommitFailedException apply(final Exception e) {
+        // If excetion is TransactionCommitFailedException
+        // we reuse it directly.
+        if (e instanceof TransactionCommitFailedException) {
+            return (TransactionCommitFailedException) e;
+        }
+        // If error is ExecutionException which was caused by cause of
+        // TransactionCommitFailedException
+        // we reuse original cause
+        if (e instanceof ExecutionException && e.getCause() instanceof TransactionCommitFailedException) {
+            return (TransactionCommitFailedException) e.getCause();
+        }
+        if (e instanceof InterruptedException) {
+            return new TransactionCommitFailedException(opName + " failed - DOMStore was interupted.", e);
+        }
+        // Otherwise we are using new exception, with original cause
+        return new TransactionCommitFailedException(opName + " failed", e);
+    }
+}
\ No newline at end of file
index 70db71f..b0ccfb9 100644 (file)
@@ -1,11 +1,13 @@
 package org.opendaylight.controller.sal.dom.broker.osgi;
 
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.osgi.framework.ServiceReference;
@@ -38,4 +40,9 @@ public class DOMDataBrokerProxy extends AbstractBrokerServiceProxy<DOMDataBroker
         return getDelegate().registerDataChangeListener(store, path, listener, triggeringScope);
     }
 
+    @Override
+    public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+        return getDelegate().createTransactionChain(listener);
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/BlockingTransactionChainListener.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/BlockingTransactionChainListener.java
new file mode 100644 (file)
index 0000000..f9c301c
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * Simple implementation of {@link TransactionChainListener} for testing.
+ * 
+ * This transaction chain listener does not contain any logic, only update
+ * futures ({@link #getFailFuture()} and {@link #getSuccessFuture()} when
+ * transaction chain event is retrieved.
+ * 
+ */
+class BlockingTransactionChainListener implements TransactionChainListener {
+
+    private final SettableFuture<Throwable> failFuture = SettableFuture.create();
+    private final SettableFuture<Void> successFuture = SettableFuture.create();
+
+    @Override
+    public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction,
+            Throwable cause) {
+        failFuture.set(cause);
+    }
+
+    @Override
+    public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+        successFuture.set(null);
+    }
+
+    public SettableFuture<Throwable> getFailFuture() {
+        return failFuture;
+    }
+
+    public SettableFuture<Void> getSuccessFuture() {
+        return successFuture;
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java
new file mode 100644 (file)
index 0000000..b360cb1
--- /dev/null
@@ -0,0 +1,209 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class DOMTransactionChainTest {
+
+    private SchemaContext schemaContext;
+    private DOMDataBrokerImpl domBroker;
+
+    @Before
+    public void setupStore() {
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+        schemaContext = TestModel.createTestContext();
+
+        operStore.onGlobalContextUpdated(schemaContext);
+        configStore.onGlobalContextUpdated(schemaContext);
+
+        ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
+                .put(CONFIGURATION, configStore) //
+                .put(OPERATIONAL, operStore) //
+                .build();
+
+        ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+        domBroker = new DOMDataBrokerImpl(stores, executor);
+    }
+
+    @Test
+    public void testTransactionChainNoConflict() throws InterruptedException, ExecutionException, TimeoutException {
+        BlockingTransactionChainListener listener = new BlockingTransactionChainListener();
+        DOMTransactionChain txChain = domBroker.createTransactionChain(listener);
+        assertNotNull(txChain);
+
+        /**
+         * We alocate new read-write transaction and write /test
+         * 
+         * 
+         */
+        DOMDataReadWriteTransaction firstTx = allocateAndWrite(txChain);
+
+        /**
+         * First transaction is marked as ready, we are able to allocate chained
+         * transactions
+         */
+        ListenableFuture<RpcResult<TransactionStatus>> firstWriteTxFuture = firstTx.commit();
+
+        /**
+         * We alocate chained transaction - read transaction.
+         */
+        DOMDataReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+
+        /**
+         * 
+         * We test if we are able to read data from tx, read should not fail
+         * since we are using chained transaction.
+         * 
+         * 
+         */
+        assertTestContainerExists(secondReadTx);
+
+        /**
+         * 
+         * We alocate next transaction, which is still based on first one, but
+         * is read-write.
+         * 
+         */
+        DOMDataReadWriteTransaction thirdDeleteTx = allocateAndDelete(txChain);
+
+        /**
+         * third transaction is sealed.
+         */
+        ListenableFuture<RpcResult<TransactionStatus>> thirdDeleteTxFuture = thirdDeleteTx.commit();
+
+        /**
+         * We commit first transaction
+         * 
+         */
+        assertCommitSuccessful(firstWriteTxFuture);
+
+        // Alocates store transaction
+        DOMDataReadTransaction storeReadTx = domBroker.newReadOnlyTransaction();
+        /**
+         * We verify transaction is commited to store, container should exists
+         * in datastore.
+         */
+        assertTestContainerExists(storeReadTx);
+        /**
+         * We commit third transaction
+         * 
+         */
+        assertCommitSuccessful(thirdDeleteTxFuture);
+
+        /**
+         * We close transaction chain.
+         */
+        txChain.close();
+
+        listener.getSuccessFuture().get(1000, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testTransactionChainNotSealed() throws InterruptedException, ExecutionException, TimeoutException {
+        BlockingTransactionChainListener listener = new BlockingTransactionChainListener();
+        DOMTransactionChain txChain = domBroker.createTransactionChain(listener);
+        assertNotNull(txChain);
+
+        /**
+         * We alocate new read-write transaction and write /test
+         * 
+         * 
+         */
+        allocateAndWrite(txChain);
+
+        /**
+         * We alocate chained transaction - read transaction, note first one is
+         * still not commited to datastore, so this allocation should fail with
+         * IllegalStateException.
+         */
+        try {
+            DOMDataReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+            fail("Allocation of secondReadTx should fail with IllegalStateException");
+        } catch (Exception e) {
+            assertTrue(e instanceof IllegalStateException);
+        }
+    }
+
+    private static DOMDataReadWriteTransaction allocateAndDelete(DOMTransactionChain txChain)
+            throws InterruptedException, ExecutionException {
+        DOMDataReadWriteTransaction tx = txChain.newReadWriteTransaction();
+
+        /**
+         * We test existence of /test in third transaction container should
+         * still be visible from first one (which is still uncommmited).
+         * 
+         */
+        assertTestContainerExists(tx);
+
+        /**
+         * We delete node in third transaction
+         */
+        tx.delete(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH);
+        return tx;
+    }
+
+    private static DOMDataReadWriteTransaction allocateAndWrite(DOMTransactionChain txChain)
+            throws InterruptedException, ExecutionException {
+        DOMDataReadWriteTransaction tx = txChain.newReadWriteTransaction();
+        assertTestContainerWrite(tx);
+        return tx;
+    }
+
+    private static void assertCommitSuccessful(ListenableFuture<RpcResult<TransactionStatus>> future)
+            throws InterruptedException, ExecutionException {
+        RpcResult<TransactionStatus> rpcResult = future.get();
+        assertTrue(rpcResult.isSuccessful());
+        assertEquals(TransactionStatus.COMMITED, rpcResult.getResult());
+    }
+
+    private static void assertTestContainerExists(DOMDataReadTransaction readTx) throws InterruptedException,
+            ExecutionException {
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> readFuture = readTx.read(OPERATIONAL, TestModel.TEST_PATH);
+        Optional<NormalizedNode<?, ?>> readedData = readFuture.get();
+        assertTrue(readedData.isPresent());
+    }
+
+    private static void assertTestContainerWrite(DOMDataReadWriteTransaction tx) throws InterruptedException,
+            ExecutionException {
+        tx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        assertTestContainerExists(tx);
+    }
+}