Bug 6696: Add transaction chain support to netconf connector 75/45575/16
authorAndrej Mak <andrej.mak@pantheon.tech>
Wed, 14 Sep 2016 06:08:31 +0000 (08:08 +0200)
committerJakub Morvay <jmorvay@cisco.com>
Mon, 7 Nov 2016 09:44:24 +0000 (10:44 +0100)
Change-Id: I3d680be67589e7f005ef470f0b3896c3256fca4b
Signed-off-by: Andrej Mak <andrej.mak@pantheon.tech>
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/NetconfDeviceDataBroker.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/AbstractWriteTx.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/TxChain.java [new file with mode: 0644]
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/TxListener.java [new file with mode: 0644]
netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/NetconfDeviceWriteOnlyTxTest.java
netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/TxChainTest.java [new file with mode: 0644]

index 421e52da71978af327083e7f39b83aa3262015a7..2a56d01b1a4f4702ba4d95b7560dfb76d92c948f 100644 (file)
@@ -24,6 +24,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadOnlyTx;
 import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.TxChain;
 import org.opendaylight.netconf.sal.connect.netconf.sal.tx.WriteCandidateRunningTx;
 import org.opendaylight.netconf.sal.connect.netconf.sal.tx.WriteCandidateTx;
 import org.opendaylight.netconf.sal.connect.netconf.sal.tx.WriteRunningTx;
@@ -38,8 +39,8 @@ public final class NetconfDeviceDataBroker implements DOMDataBroker {
     private final NetconfBaseOps netconfOps;
 
     private final boolean rollbackSupport;
-    private boolean candidateSupported;
-    private boolean runningWritable;
+    private final boolean candidateSupported;
+    private final boolean runningWritable;
 
     public NetconfDeviceDataBroker(final RemoteDeviceId id, final SchemaContext schemaContext, final DOMRpcService rpc, final NetconfSessionPreferences netconfSessionPreferences) {
         this.id = id;
@@ -83,7 +84,7 @@ public final class NetconfDeviceDataBroker implements DOMDataBroker {
 
     @Override
     public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
-        throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+        return new TxChain(this, listener);
     }
 
     @Override
index 702657dabaf244138074c2435fcd8fa9dc686636..d0f89bc9b4910520baf01725160fe1965f3b13ec 100644 (file)
@@ -16,14 +16,18 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.config.util.xml.DocumentedException;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.netconf.api.NetconfDocumentedException;
 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.ModifyAction;
@@ -42,6 +46,7 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
     protected final NetconfBaseOps netOps;
     protected final boolean rollbackSupport;
     protected final List<ListenableFuture<DOMRpcResult>> resultsFutures;
+    private final List<TxListener> listeners = new CopyOnWriteArrayList<>();
     // Allow commit to be called only once
     protected boolean finished = false;
 
@@ -70,7 +75,7 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
         if(isFinished()) {
             return false;
         }
-
+        listeners.forEach(listener -> listener.onTransactionCancelled(this));
         finished = true;
         cleanup();
         return true;
@@ -131,10 +136,27 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
 
     @Override
     public final ListenableFuture<RpcResult<TransactionStatus>> commit() {
+        listeners.forEach(listener -> listener.onTransactionSubmitted(this));
         checkNotFinished();
         finished = true;
+        final ListenableFuture<RpcResult<TransactionStatus>> result = performCommit();
+        Futures.addCallback(result, new FutureCallback<RpcResult<TransactionStatus>>() {
+            @Override
+            public void onSuccess(@Nullable final RpcResult<TransactionStatus> result) {
+                if (result != null && result.isSuccessful()) {
+                    listeners.forEach(txListener -> txListener.onTransactionSuccessful(AbstractWriteTx.this));
+                } else {
+                    final TransactionCommitFailedException cause = new TransactionCommitFailedException("Transaction failed", result.getErrors().toArray(new RpcError[result.getErrors().size()]));
+                    listeners.forEach(listener -> listener.onTransactionFailed(AbstractWriteTx.this, cause));
+                }
+            }
 
-        return performCommit();
+            @Override
+            public void onFailure(final Throwable t) {
+                listeners.forEach(listener -> listener.onTransactionFailed(AbstractWriteTx.this, t));
+            }
+        });
+        return result;
     }
 
     protected abstract ListenableFuture<RpcResult<TransactionStatus>> performCommit();
@@ -172,15 +194,20 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
             public void onFailure(final Throwable throwable) {
                 final NetconfDocumentedException exception =
                         new NetconfDocumentedException(
-                                new DocumentedException(id + ":RPC during tx returned an exception",
-                                        new Exception(throwable),
-                                        DocumentedException.ErrorType.APPLICATION,
-                                        DocumentedException.ErrorTag.OPERATION_FAILED,
-                                        DocumentedException.ErrorSeverity.ERROR));
+                                id + ":RPC during tx returned an exception",
+                                new Exception(throwable),
+                                DocumentedException.ErrorType.APPLICATION,
+                                DocumentedException.ErrorTag.OPERATION_FAILED,
+                                DocumentedException.ErrorSeverity.ERROR);
                 transformed.setException(exception);
             }
         });
 
         return transformed;
     }
+
+    AutoCloseable addListener(final TxListener listener) {
+        listeners.add(listener);
+        return () -> listeners.remove(listener);
+    }
 }
diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/TxChain.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/TxChain.java
new file mode 100644 (file)
index 0000000..7dbc835
--- /dev/null
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DOMTransactionChain} implementation for Netconf connector.
+ */
+public class TxChain implements DOMTransactionChain, TxListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TxChain.class);
+
+    private final DOMDataBroker dataBroker;
+    private final TransactionChainListener listener;
+    /**
+     * Submitted transactions that haven't completed yet.
+     */
+    private final Map<DOMDataWriteTransaction, AutoCloseable> pendingTransactions = new HashMap<>();
+
+    /**
+     * Transaction created by this chain that hasn't been submitted or cancelled yet.
+     */
+    private AbstractWriteTx currentTransaction = null;
+    private boolean closed = false;
+    private boolean successful = true;
+
+    public TxChain(final DOMDataBroker dataBroker, final TransactionChainListener listener) {
+        this.dataBroker = dataBroker;
+        this.listener = listener;
+    }
+
+    @Override
+    public synchronized DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+        checkOperationPermitted();
+        return dataBroker.newReadOnlyTransaction();
+    }
+
+    @Override
+    public synchronized AbstractWriteTx newWriteOnlyTransaction() {
+        checkOperationPermitted();
+        final DOMDataWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+        Preconditions.checkState(writeTransaction instanceof AbstractWriteTx);
+        final AbstractWriteTx pendingWriteTx = (AbstractWriteTx) writeTransaction;
+        pendingTransactions.put(pendingWriteTx, pendingWriteTx.addListener(this));
+        currentTransaction = pendingWriteTx;
+        return pendingWriteTx;
+    }
+
+    @Override
+    public synchronized DOMDataReadWriteTransaction newReadWriteTransaction() {
+        return new ReadWriteTx(dataBroker.newReadOnlyTransaction(), newWriteOnlyTransaction());
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!closed) {
+            closed = true;
+            notifyChainListenerSuccess();
+        }
+    }
+
+    @Override
+    public synchronized void onTransactionSuccessful(final AbstractWriteTx transaction) {
+        removePendingTx(transaction);
+        notifyChainListenerSuccess();
+    }
+
+    @Override
+    public synchronized void onTransactionFailed(final AbstractWriteTx transaction, final Throwable cause) {
+        removePendingTx(transaction);
+        successful = false;
+        if (currentTransaction != null) {
+            currentTransaction.cancel();
+        }
+        listener.onTransactionChainFailed(this, transaction, cause);
+    }
+
+    @Override
+    public synchronized void onTransactionSubmitted(final AbstractWriteTx transaction) {
+        currentTransaction = null;
+    }
+
+    @Override
+    public synchronized void onTransactionCancelled(final AbstractWriteTx transaction) {
+        removePendingTx(transaction);
+        currentTransaction = null;
+    }
+
+    private void removePendingTx(final AbstractWriteTx transaction) {
+        try {
+            pendingTransactions.remove(transaction).close();
+        } catch (final Exception e) {
+            LOG.error("Can't remove transaction listener registration", e);
+        }
+    }
+
+    /**
+     * Checks, if chain isn't closed and if there is no not submitted write transaction waiting.
+     */
+    private void checkOperationPermitted() {
+        if (closed) {
+            throw new TransactionChainClosedException("Transaction chain was closed");
+        }
+        Preconditions.checkState(currentTransaction == null, "Last write transaction has not finished yet");
+    }
+
+    private void notifyChainListenerSuccess() {
+        if (closed && pendingTransactions.isEmpty() && successful) {
+            listener.onTransactionChainSuccessful(this);
+        }
+    }
+
+}
diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/TxListener.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/TxListener.java
new file mode 100644 (file)
index 0000000..1c97cab
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
+
+interface TxListener {
+
+    /**
+     * Invoked, when transaction completes successfully.
+     * @param transaction transaction
+     */
+    void onTransactionSuccessful(AbstractWriteTx transaction);
+
+    /**
+     * Invoked, when transaction fails.
+     *
+     * @param transaction transaction
+     * @param cause cause
+     */
+    void onTransactionFailed(AbstractWriteTx transaction, Throwable cause);
+
+    /**
+     * Invoked, when transaction is cancelled.
+     * @param transaction transaction
+     */
+    void onTransactionCancelled(AbstractWriteTx transaction);
+
+    /**
+     * Invoked, when transaction is submitted.
+     * @param transaction transaction
+     */
+    void onTransactionSubmitted(AbstractWriteTx transaction);
+
+
+}
index 3efa83719f3caa3ec787c3ee76f1303669259e28..7d4dd0abaecdecf03333cf144653ea68b62599c2 100644 (file)
@@ -15,6 +15,7 @@ import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_CANDIDATE_QNAME;
 import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_FILTER_QNAME;
@@ -24,8 +25,10 @@ import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTr
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import java.net.InetSocketAddress;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -140,4 +143,51 @@ public class NetconfDeviceWriteOnlyTxTest {
         inOrder.verify(rpc).invokeRpc(toPath(NetconfMessageTransformUtil.NETCONF_UNLOCK_QNAME), NetconfBaseOps.getUnLockContent(NETCONF_RUNNING_QNAME));
     }
 
+    @Test
+    public void testListenerSuccess() throws Exception {
+        doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult((NormalizedNode<?, ?>) null)))
+                .when(rpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
+        final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc, BaseSchema.BASE_NETCONF_CTX.getSchemaContext()), false);
+        final TxListener listener = mock(TxListener.class);
+        tx.addListener(listener);
+        tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId);
+        tx.submit();
+        verify(listener).onTransactionSubmitted(tx);
+        verify(listener).onTransactionSuccessful(tx);
+        verify(listener, never()).onTransactionFailed(eq(tx), any());
+        verify(listener, never()).onTransactionCancelled(tx);
+    }
+
+    @Test
+    public void testListenerCancellation() throws Exception {
+        doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult((NormalizedNode<?, ?>) null)))
+                .when(rpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
+        final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc, BaseSchema.BASE_NETCONF_CTX.getSchemaContext()), false);
+        final TxListener listener = mock(TxListener.class);
+        tx.addListener(listener);
+        tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId);
+        tx.cancel();
+        verify(listener).onTransactionCancelled(tx);
+        verify(listener, never()).onTransactionSubmitted(tx);
+        verify(listener, never()).onTransactionSuccessful(tx);
+        verify(listener, never()).onTransactionFailed(eq(tx), any());
+    }
+
+    @Test
+    public void testListenerFailure() throws Exception {
+        final IllegalStateException cause = new IllegalStateException("Failed tx");
+        doReturn(Futures.immediateFailedCheckedFuture(cause))
+                .when(rpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
+        final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc, BaseSchema.BASE_NETCONF_CTX.getSchemaContext()), false);
+        final TxListener listener = mock(TxListener.class);
+        tx.addListener(listener);
+        tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId);
+        tx.submit();
+        final ArgumentCaptor<Exception> excCaptor = ArgumentCaptor.forClass(Exception.class);
+        verify(listener).onTransactionSubmitted(tx);
+        verify(listener).onTransactionFailed(eq(tx), excCaptor.capture());
+        Assert.assertEquals(cause, excCaptor.getValue().getCause().getCause());
+        verify(listener, never()).onTransactionSuccessful(tx);
+        verify(listener, never()).onTransactionCancelled(tx);
+    }
 }
diff --git a/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/TxChainTest.java b/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/TxChainTest.java
new file mode 100644 (file)
index 0000000..e56b02d
--- /dev/null
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+
+public class TxChainTest {
+
+    @Mock
+    private DOMDataBroker broker;
+    @Mock
+    private TransactionChainListener listener;
+    @Mock
+    private DOMDataReadOnlyTransaction readOnlyTx;
+    @Mock
+    private AbstractWriteTx writeOnlyTx1;
+    @Mock
+    private AbstractWriteTx writeOnlyTx2;
+    @Mock
+    private AbstractWriteTx writeOnlyTx3;
+    @Mock
+    private AutoCloseable registration1;
+    @Mock
+    private AutoCloseable registration2;
+    @Mock
+    private AutoCloseable registration3;
+    private final ArgumentCaptor<TxListener> captor = ArgumentCaptor.forClass(TxListener.class);
+    private TxChain chain;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        when(broker.newReadOnlyTransaction()).thenReturn(readOnlyTx);
+        when(broker.newWriteOnlyTransaction()).thenReturn(writeOnlyTx1).thenReturn(writeOnlyTx2).thenReturn(writeOnlyTx3);
+        when(writeOnlyTx1.addListener(any())).thenReturn(registration1);
+        when(writeOnlyTx2.addListener(any())).thenReturn(registration2);
+        when(writeOnlyTx3.addListener(any())).thenReturn(registration3);
+        chain = new TxChain(broker, listener);
+    }
+
+    @Test()
+    public void testNewReadOnlyTransactionPrevSubmitted() throws Exception {
+        chain.newWriteOnlyTransaction();
+        verify(writeOnlyTx1).addListener(captor.capture());
+        captor.getValue().onTransactionSubmitted(writeOnlyTx1);
+        chain.newReadOnlyTransaction();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testNewReadOnlyTransactionPrevNotSubmitted() throws Exception {
+        chain.newWriteOnlyTransaction();
+        chain.newReadOnlyTransaction();
+    }
+
+    @Test
+    public void testNewReadWriteTransactionPrevSubmitted() throws Exception {
+        chain.newReadWriteTransaction();
+        verify(writeOnlyTx1).addListener(captor.capture());
+        captor.getValue().onTransactionSubmitted(writeOnlyTx1);
+        chain.newReadWriteTransaction();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testNewReadWriteTransactionPrevNotSubmitted() throws Exception {
+        chain.newReadWriteTransaction();
+        chain.newReadWriteTransaction();
+    }
+
+    @Test
+    public void testNewWriteOnlyTransactionPrevSubmitted() throws Exception {
+        chain.newWriteOnlyTransaction();
+        verify(writeOnlyTx1).addListener(captor.capture());
+        captor.getValue().onTransactionSubmitted(writeOnlyTx1);
+        chain.newWriteOnlyTransaction();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testNewWriteOnlyTransactionPrevNotSubmitted() throws Exception {
+        chain.newWriteOnlyTransaction();
+        chain.newWriteOnlyTransaction();
+    }
+
+    @Test(expected = TransactionChainClosedException.class)
+    public void testCloseAfterFinished() throws Exception {
+        chain.close();
+        verify(listener).onTransactionChainSuccessful(chain);
+        chain.newReadOnlyTransaction();
+    }
+
+    @Test
+    public void testChainFail() throws Exception {
+        final AbstractWriteTx writeTx = chain.newWriteOnlyTransaction();
+        final ArgumentCaptor<TxListener> captor = ArgumentCaptor.forClass(TxListener.class);
+        verify(writeOnlyTx1).addListener(captor.capture());
+        writeTx.submit();
+        final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail");
+        captor.getValue().onTransactionFailed(writeOnlyTx1, cause);
+        verify(registration1).close();
+        verify(listener).onTransactionChainFailed(chain, writeOnlyTx1, cause);
+    }
+
+    @Test
+    public void testChainSuccess() throws Exception {
+        final AbstractWriteTx writeTx = chain.newWriteOnlyTransaction();
+        chain.close();
+        verify(writeOnlyTx1).addListener(captor.capture());
+        writeTx.submit();
+        captor.getValue().onTransactionSuccessful(writeOnlyTx1);
+        verify(registration1).close();
+        verify(listener).onTransactionChainSuccessful(chain);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        final AbstractWriteTx writeTx = chain.newWriteOnlyTransaction();
+        verify(writeOnlyTx1).addListener(captor.capture());
+        writeTx.cancel();
+        captor.getValue().onTransactionCancelled(writeOnlyTx1);
+        chain.newWriteOnlyTransaction();
+    }
+
+    @Test
+    public void testMultiplePendingTransactions() throws Exception {
+        //create 1st tx
+        final AbstractWriteTx writeTx1 = chain.newWriteOnlyTransaction();
+        final ArgumentCaptor<TxListener> captor1 = ArgumentCaptor.forClass(TxListener.class);
+        verify(writeOnlyTx1).addListener(captor1.capture());
+        //submit 1st tx
+        writeTx1.submit();
+        captor1.getValue().onTransactionSubmitted(writeOnlyTx1);
+
+        //create 2nd tx
+        final AbstractWriteTx writeTx2 = chain.newWriteOnlyTransaction();
+        final ArgumentCaptor<TxListener> captor2 = ArgumentCaptor.forClass(TxListener.class);
+        verify(writeTx2).addListener(captor2.capture());
+        //submit 2nd tx
+        writeTx2.submit();
+        captor2.getValue().onTransactionSubmitted(writeOnlyTx2);
+
+        //create 3rd tx
+        final AbstractWriteTx writeTx3 = chain.newWriteOnlyTransaction();
+        final ArgumentCaptor<TxListener> captor3 = ArgumentCaptor.forClass(TxListener.class);
+        verify(writeTx3).addListener(captor3.capture());
+        //cancel 3rd tx
+        writeTx3.cancel();
+        captor3.getValue().onTransactionCancelled(writeOnlyTx3);
+
+        //close chain
+        chain.close();
+
+        //complete first two transactions successfully
+        captor1.getValue().onTransactionSuccessful(writeOnlyTx1);
+        captor2.getValue().onTransactionSuccessful(writeOnlyTx2);
+
+        verify(registration1).close();
+        verify(registration2).close();
+        verify(registration3).close();
+        verify(listener).onTransactionChainSuccessful(chain);
+    }
+
+    @Test
+    public void testMultiplePendingTransactionsFail() throws Exception {
+        //create 1st tx
+        final AbstractWriteTx writeTx1 = chain.newWriteOnlyTransaction();
+        final ArgumentCaptor<TxListener> captor1 = ArgumentCaptor.forClass(TxListener.class);
+        verify(writeOnlyTx1).addListener(captor1.capture());
+        //submit 1st tx
+        writeTx1.submit();
+        captor1.getValue().onTransactionSubmitted(writeOnlyTx1);
+
+        //create 2nd tx
+        final AbstractWriteTx writeTx2 = chain.newWriteOnlyTransaction();
+        final ArgumentCaptor<TxListener> captor2 = ArgumentCaptor.forClass(TxListener.class);
+        verify(writeTx2).addListener(captor2.capture());
+        //submit 2nd tx
+        writeTx2.submit();
+        captor2.getValue().onTransactionSubmitted(writeOnlyTx2);
+
+        //create 3rd tx
+        final AbstractWriteTx writeTx3 = chain.newWriteOnlyTransaction();
+        final ArgumentCaptor<TxListener> captor3 = ArgumentCaptor.forClass(TxListener.class);
+        verify(writeTx3).addListener(captor3.capture());
+
+        chain.close();
+
+        //fail 1st transaction
+        final Exception cause1 = new Exception("fail");
+        captor1.getValue().onTransactionFailed(writeOnlyTx1, cause1);
+        //current unsubmitted transaction should be cancelled
+        verify(writeTx3).cancel();
+        captor3.getValue().onTransactionCancelled(writeTx3);
+        //2nd transaction success
+        captor2.getValue().onTransactionSuccessful(writeOnlyTx2);
+
+        verify(registration1).close();
+        verify(registration2).close();
+        verify(registration3).close();
+        verify(listener).onTransactionChainFailed(chain, writeOnlyTx1, cause1);
+        // 1 transaction failed, onTransactionChainSuccessful must not be called
+        verify(listener, never()).onTransactionChainSuccessful(chain);
+    }
+}
\ No newline at end of file