Add AbstractProxyTransaction derived classes tests 89/53589/3
authorAndrej Mak <andrej.mak@pantheon.tech>
Tue, 21 Mar 2017 07:05:34 +0000 (08:05 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 21 Mar 2017 11:53:06 +0000 (11:53 +0000)
Change-Id: Ie78c9213b9ca9a41066b34463557b6feb6f8b18d
Signed-off-by: Andrej Mak <andrej.mak@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransactionTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransactionTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TransactionTester.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TranasactionTester.java with 83% similarity]

index 9929b402ca94e8d1a3a924126198bfd4e7e59338..987b90c57272dd9af821b175657462fbb6a0dc18 100644 (file)
@@ -9,7 +9,10 @@ package org.opendaylight.controller.cluster.access.client;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 
 /**
@@ -32,4 +35,10 @@ public class AccessClientUtil {
         connection.receiveResponse(envelope);
     }
 
+    public static ConnectionEntry createConnectionEntry(final Request<?, ?> request,
+                                                        final Consumer<Response<?, ?>> callback,
+                                                        final long now) {
+        return new ConnectionEntry(request, callback, now);
+    }
+
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java
new file mode 100644 (file)
index 0000000..6b37bc0
--- /dev/null
@@ -0,0 +1,297 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.hamcrest.CoreMatchers.both;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.core.Is.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.google.common.primitives.UnsignedLong;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransaction> {
+    protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID;
+    private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID;
+    private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID;
+
+    protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder()
+            .node(QName.create("ns-1", "node-1"))
+            .build();
+    protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder()
+            .node(QName.create("ns-1", "node-2"))
+            .build();
+    protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder()
+            .node(QName.create("ns-1", "node-3"))
+            .build();
+    protected static final ContainerNode DATA_1 = Builders.containerBuilder()
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_1.getLastPathArgument().getNodeType()))
+            .build();
+    protected static final ContainerNode DATA_2 = Builders.containerBuilder()
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_2.getLastPathArgument().getNodeType()))
+            .build();
+    protected static final String PERSISTENCE_ID = "per-1";
+
+    @Mock
+    private DataTreeSnapshot snapshot;
+    @Mock
+    private AbstractClientHistory history;
+    private ActorSystem system;
+    private TestProbe backendProbe;
+    private TestProbe clientContextProbe;
+    private TransactionTester<T> tester;
+    protected T transaction;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        system = ActorSystem.apply();
+        clientContextProbe = new TestProbe(system, "clientContext");
+        backendProbe = new TestProbe(system, "backend");
+        final ClientActorContext context =
+                AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+        final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
+                "default", UnsignedLong.ZERO, Optional.empty(), 3);
+        final AbstractClientConnection<ShardBackendInfo> connection =
+                AccessClientUtil.createConnectedConnection(context, 0L, backend);
+        final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
+        transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
+        tester = new TransactionTester<>(transaction, connection, backendProbe);
+    }
+
+    protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system);
+    }
+
+    @Test
+    public abstract void testExists() throws Exception;
+
+    @Test
+    public abstract void testRead() throws Exception;
+
+    @Test
+    public abstract void testWrite() throws Exception;
+
+    @Test
+    public abstract void testMerge() throws Exception;
+
+    @Test
+    public abstract void testDelete() throws Exception;
+
+    @Test
+    public abstract void testDirectCommit() throws Exception;
+
+    @Test
+    public abstract void testCanCommit() throws Exception;
+
+    @Test
+    public abstract void testPreCommit() throws Exception;
+
+    @Test
+    public abstract void testDoCommit() throws Exception;
+
+    @Test
+    public abstract void testForwardToRemoteAbort() throws Exception;
+
+    @Test
+    public abstract void testForwardToRemoteCommit() throws Exception;
+
+    @Test
+    public void testAbortVotingFuture() throws Exception {
+        testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new);
+    }
+
+    @Test
+    public void testForwardToRemotePurge() throws Exception {
+        final TestProbe probe = new TestProbe(system);
+        final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
+        testForwardToRemote(request, TransactionPurgeRequest.class);
+    }
+
+    @Test
+    public void testReplayMessages() throws Exception {
+        final TestProbe probe = new TestProbe(system);
+        final List<ConnectionEntry> entries = new ArrayList<>();
+        final Consumer<Response<?, ?>> callback = createCallbackMock();
+        final ReadTransactionRequest request1 =
+                new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true);
+        final ExistsTransactionRequest request2 =
+                new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true);
+        entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
+        entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
+        final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
+        final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
+        transaction.recordSuccessfulRequest(successful1);
+        final ReadTransactionRequest successful2 =
+                new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+        transaction.recordSuccessfulRequest(successful2);
+        transaction.startReconnect();
+        transaction.replayMessages(successor.getTransaction(), entries);
+        Assert.assertEquals(successful1, successor.expectTransactionRequest(AbortLocalTransactionRequest.class));
+        Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class));
+        Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class));
+        Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class));
+    }
+
+    protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
+        final List<TransactionModification> modifications = modifyRequest.getModifications();
+        Assert.assertEquals(3, modifications.size());
+        Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and((hasPath(PATH_1)))));
+        Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and((hasPath(PATH_2)))));
+        Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and((hasPath(PATH_3)))));
+    }
+
+    protected void testRequestResponse(final Consumer<VotingFuture> consumer,
+                                       final Class<? extends TransactionRequest> expectedRequest,
+                                       final BiFunction<TransactionIdentifier, Long, TransactionSuccess> replySupplier)
+            throws Exception {
+        final TransactionTester<T> tester = getTester();
+        final VotingFuture future = mock(VotingFuture.class);
+        transaction.seal();
+        consumer.accept(future);
+        final TransactionRequest req = tester.expectTransactionRequest(expectedRequest);
+        tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
+        verify(future).voteYes();
+    }
+
+    protected <T extends TransactionRequest> T testHandleForwardedRemoteRequest(final T request) throws Exception {
+        transaction.handleForwardedRemoteRequest(request, createCallbackMock());
+        final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
+        final T received = (T) envelope.getMessage();
+        Assert.assertTrue(received.getClass().equals(request.getClass()));
+        Assert.assertEquals(TRANSACTION_ID, received.getTarget());
+        Assert.assertEquals(clientContextProbe.ref(), received.getReplyTo());
+        return received;
+    }
+
+    protected <T extends TransactionRequest> T testForwardToRemote(final TransactionRequest toForward,
+                                                                   final Class<T> expectedMessageClass) {
+        final Consumer<Response<?, ?>> callback = createCallbackMock();
+        final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
+        final RemoteProxyTransaction successor = transactionTester.getTransaction();
+        transaction.forwardToRemote(successor, toForward, callback);
+        return transactionTester.expectTransactionRequest(expectedMessageClass);
+    }
+
+    protected TransactionTester<T> getTester() {
+        return tester;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T> Consumer<T> createCallbackMock() {
+        return (Consumer<T>) mock(Consumer.class);
+    }
+
+    protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
+        return new BaseMatcher<TransactionModification>() {
+
+            @Override
+            public boolean matches(final Object item) {
+                return path.equals(((TransactionModification) item).getPath());
+            }
+
+            @Override
+            public void describeTo(final Description description) {
+                description.appendValue(path);
+            }
+
+            @Override
+            public void describeMismatch(final Object item, final Description description) {
+                final TransactionModification modification = (TransactionModification) item;
+                description.appendText("was ").appendValue(modification.getPath());
+            }
+        };
+    }
+
+    protected TestProbe createProbe() {
+        return new TestProbe(system);
+    }
+
+    protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
+        final TestProbe backendProbe = new TestProbe(system, "backend2");
+        final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
+        final ClientActorContext context =
+                AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+        final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
+                "default", UnsignedLong.ZERO, Optional.empty(), 3);
+        final AbstractClientConnection<ShardBackendInfo> connection =
+                AccessClientUtil.createConnectedConnection(context, 0L, backend);
+        final AbstractClientHistory history = mock(AbstractClientHistory.class);
+        final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
+        final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
+        when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class));
+        final LocalReadWriteProxyTransaction tx =
+                new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
+        return new TransactionTester<>(tx, connection, backendProbe);
+    }
+
+    protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
+        final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
+        final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
+        final AbstractClientHistory history = mock(AbstractClientHistory.class);
+        final ClientActorContext context =
+                AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+        final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
+                "default", UnsignedLong.ZERO, Optional.empty(), 5);
+        final AbstractClientConnection<ShardBackendInfo> connection =
+                AccessClientUtil.createConnectedConnection(context, 0L, backend);
+        final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
+        final RemoteProxyTransaction transaction =
+                new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+        return new TransactionTester<>(transaction, connection, backendProbe);
+    }
+}
index 9497157ff8f5071fd753cea1c834972ccedb9d58..294c62a23ca70426c6879d28fdd8ae32c776c31d 100644 (file)
@@ -56,7 +56,7 @@ public class ClientTransactionCommitCohortTest {
     @Mock
     private AbstractClientHistory history;
     private ActorSystem system;
-    private List<TranasactionTester> transactions;
+    private List<TransactionTester> transactions;
     private ClientTransactionCommitCohort cohort;
 
     @Before
@@ -71,7 +71,7 @@ public class ClientTransactionCommitCohortTest {
             transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
         }
         final Collection<AbstractProxyTransaction> proxies = transactions.stream()
-                .map(TranasactionTester::getTransaction)
+                .map(TransactionTester::getTransaction)
                 .collect(Collectors.toList());
         proxies.forEach(AbstractProxyTransaction::seal);
         cohort = new ClientTransactionCommitCohort(history, TRANSACTION_ID, proxies);
@@ -124,51 +124,51 @@ public class ClientTransactionCommitCohortTest {
         testOpFail(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess);
     }
 
-    private void expectCanCommit(final TranasactionTester tester) {
+    private void expectCanCommit(final TransactionTester<RemoteProxyTransaction> tester) {
         final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class);
         Assert.assertTrue(request.getPersistenceProtocol().isPresent());
         Assert.assertEquals(PersistenceProtocol.THREE_PHASE, request.getPersistenceProtocol().get());
     }
 
-    void expectPreCommit(final TranasactionTester tester) {
+    void expectPreCommit(final TransactionTester tester) {
         tester.expectTransactionRequest(TransactionPreCommitRequest.class);
     }
 
-    void expectCommit(final TranasactionTester tester) {
+    void expectCommit(final TransactionTester tester) {
         tester.expectTransactionRequest(TransactionDoCommitRequest.class);
     }
 
-    void expectAbort(final TranasactionTester tester) {
+    void expectAbort(final TransactionTester tester) {
         tester.expectTransactionRequest(TransactionAbortRequest.class);
     }
 
-    void replyCanCommitSuccess(final TranasactionTester tester) {
+    void replyCanCommitSuccess(final TransactionTester tester) {
         final RequestSuccess<?, ?> success = new TransactionCanCommitSuccess(tester.getTransaction().getIdentifier(),
                 tester.getLastReceivedMessage().getSequence());
         tester.replySuccess(success);
     }
 
-    void replyPreCommitSuccess(final TranasactionTester tester) {
+    void replyPreCommitSuccess(final TransactionTester tester) {
         final RequestSuccess<?, ?> success = new TransactionPreCommitSuccess(tester.getTransaction().getIdentifier(),
                 tester.getLastReceivedMessage().getSequence());
         tester.replySuccess(success);
     }
 
-    void replyCommitSuccess(final TranasactionTester tester) {
+    void replyCommitSuccess(final TransactionTester tester) {
         final RequestSuccess<?, ?> success = new TransactionCommitSuccess(tester.getTransaction().getIdentifier(),
                 tester.getLastReceivedMessage().getSequence());
         tester.replySuccess(success);
     }
 
-    void replyAbortSuccess(final TranasactionTester tester) {
+    void replyAbortSuccess(final TransactionTester tester) {
         final RequestSuccess<?, ?> success = new TransactionAbortSuccess(tester.getTransaction().getIdentifier(),
                 tester.getLastReceivedMessage().getSequence());
         tester.replySuccess(success);
     }
 
-    private static TranasactionTester createTransactionTester(final TestProbe backendProbe,
-                                                              final ClientActorContext context,
-                                                              final AbstractClientHistory history) {
+    private static TransactionTester createTransactionTester(final TestProbe backendProbe,
+                                                             final ClientActorContext context,
+                                                             final AbstractClientHistory history) {
         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
         final AbstractClientConnection<ShardBackendInfo> connection =
@@ -176,13 +176,13 @@ public class ClientTransactionCommitCohortTest {
         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
         final RemoteProxyTransaction transaction =
                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
-        return new TranasactionTester(transaction, connection, backendProbe);
+        return new TransactionTester(transaction, connection, backendProbe);
     }
 
-    private void replySuccess(final Collection<TranasactionTester> transactions,
-                              final Consumer<TranasactionTester> expect,
-                              final Consumer<TranasactionTester> reply) {
-        for (final TranasactionTester transaction : transactions) {
+    private void replySuccess(final Collection<TransactionTester> transactions,
+                              final Consumer<TransactionTester> expect,
+                              final Consumer<TransactionTester> reply) {
+        for (final TransactionTester transaction : transactions) {
             expect.accept(transaction);
             reply.accept(transaction);
         }
@@ -201,8 +201,8 @@ public class ClientTransactionCommitCohortTest {
      * @throws Exception unexpected exception
      */
     private <T> void testOpSuccess(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
-                                   final Consumer<TranasactionTester> expectFunction,
-                                   final Consumer<TranasactionTester> replyFunction,
+                                   final Consumer<TransactionTester> expectFunction,
+                                   final Consumer<TransactionTester> replyFunction,
                                    final T expectedResult) throws Exception {
         final ListenableFuture<T> result = operation.apply(cohort);
         replySuccess(transactions, expectFunction, replyFunction);
@@ -221,13 +221,13 @@ public class ClientTransactionCommitCohortTest {
      * @throws Exception unexpected exception
      */
     private <T> void testOpFail(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
-                                final Consumer<TranasactionTester> expectFunction,
-                                final Consumer<TranasactionTester> replyFunction) throws Exception {
+                                final Consumer<TransactionTester> expectFunction,
+                                final Consumer<TransactionTester> replyFunction) throws Exception {
         final ListenableFuture<T> canCommit = operation.apply(cohort);
         //reply success to all except last transaction
         replySuccess(transactions.subList(0, transactions.size() - 1), expectFunction, replyFunction);
         //reply fail to last transaction
-        final TranasactionTester last = transactions.get(transactions.size() - 1);
+        final TransactionTester last = transactions.get(transactions.size() - 1);
         expectFunction.accept(last);
         final RuntimeRequestException cause = new RuntimeRequestException("fail", new RuntimeException());
         last.replyFailure(cause);
index 9c92afbc0c62b27bd3de43988971b79fd490bb38..24e898a80b0302311fb82f633051962cfffff6ef 100644 (file)
@@ -41,7 +41,7 @@ public class DirectTransactionCommitCohortTest {
     @Mock
     private AbstractClientHistory history;
     private ActorSystem system;
-    private TranasactionTester transaction;
+    private TransactionTester<?> transaction;
     private DirectTransactionCommitCohort cohort;
 
     @Before
@@ -94,9 +94,9 @@ public class DirectTransactionCommitCohortTest {
         Assert.assertNull(getWithTimeout(commit));
     }
 
-    private static TranasactionTester createTransactionTester(final TestProbe backendProbe,
-                                                              final ClientActorContext context,
-                                                              final AbstractClientHistory history) {
+    private static TransactionTester<?> createTransactionTester(final TestProbe backendProbe,
+                                                                final ClientActorContext context,
+                                                                final AbstractClientHistory history) {
         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
         final AbstractClientConnection<ShardBackendInfo> connection =
@@ -104,7 +104,7 @@ public class DirectTransactionCommitCohortTest {
         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
         final RemoteProxyTransaction transaction =
                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
-        return new TranasactionTester(transaction, connection, backendProbe);
+        return new TransactionTester<>(transaction, connection, backendProbe);
     }
 
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java
new file mode 100644 (file)
index 0000000..89a4bda
--- /dev/null
@@ -0,0 +1,163 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
+
+import akka.testkit.TestProbe;
+import java.util.function.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+
+public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
+        extends AbstractProxyTransactionTest<T> {
+
+    @Override
+    @Test
+    public void testExists() throws Exception {
+        assertFutureEquals(true, transaction.exists(PATH_1));
+        assertFutureEquals(false, transaction.exists(PATH_3));
+    }
+
+    @Override
+    @Test
+    public void testRead() throws Exception {
+        assertFutureEquals(com.google.common.base.Optional.of(DATA_1), transaction.read(PATH_1));
+        assertFutureEquals(com.google.common.base.Optional.absent(), transaction.read(PATH_3));
+    }
+
+    @Test
+    public void testDoAbort() throws Exception {
+        transaction.doAbort();
+        getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
+    }
+
+    @Test
+    public void testHandleForwardedRemoteReadRequest() throws Exception {
+        final TestProbe probe = createProbe();
+        final ReadTransactionRequest request =
+                new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+        final Consumer<Response<?, ?>> callback = createCallbackMock();
+        transaction.handleForwardedRemoteRequest(request, callback);
+        final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
+        verify(callback).accept(captor.capture());
+        final Response value = captor.getValue();
+        Assert.assertTrue(value instanceof ReadTransactionSuccess);
+        final ReadTransactionSuccess success = (ReadTransactionSuccess) value;
+        Assert.assertTrue(success.getData().isPresent());
+        Assert.assertEquals(DATA_1, success.getData().get());
+    }
+
+    @Test
+    public void testHandleForwardedRemoteExistsRequest() throws Exception {
+        final TestProbe probe = createProbe();
+        final ExistsTransactionRequest request =
+                new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+        final Consumer<Response<?, ?>> callback = createCallbackMock();
+        transaction.handleForwardedRemoteRequest(request, callback);
+        final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
+        verify(callback).accept(captor.capture());
+        final Response value = captor.getValue();
+        Assert.assertTrue(value instanceof ExistsTransactionSuccess);
+        final ExistsTransactionSuccess success = (ExistsTransactionSuccess) value;
+        Assert.assertTrue(success.getExists());
+    }
+
+    @Test
+    public void testHandleForwardedRemotePurgeRequest() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionPurgeRequest request =
+                new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
+        testHandleForwardedRemoteRequest(request);
+    }
+
+    @Override
+    @Test
+    public void testForwardToRemoteAbort() throws Exception {
+        final TestProbe probe = createProbe();
+        final AbortLocalTransactionRequest request = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
+        final ModifyTransactionRequest modifyRequest = testForwardToRemote(request, ModifyTransactionRequest.class);
+        Assert.assertTrue(modifyRequest.getPersistenceProtocol().isPresent());
+        Assert.assertEquals(PersistenceProtocol.ABORT, modifyRequest.getPersistenceProtocol().get());
+    }
+
+    @Override
+    @Test
+    public void testForwardToRemoteCommit() throws Exception {
+        final TestProbe probe = createProbe();
+        final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
+        final CommitLocalTransactionRequest request =
+                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), modification, true);
+        doAnswer(this::applyToCursorAnswer).when(modification).applyToCursor(any());
+        final ModifyTransactionRequest modifyRequest = testForwardToRemote(request, ModifyTransactionRequest.class);
+        verify(modification).applyToCursor(any());
+        Assert.assertTrue(modifyRequest.getPersistenceProtocol().isPresent());
+        Assert.assertEquals(PersistenceProtocol.THREE_PHASE, modifyRequest.getPersistenceProtocol().get());
+        checkModifications(modifyRequest);
+    }
+
+    @Test
+    public void testForwardToLocalAbort() throws Exception {
+        final TestProbe probe = createProbe();
+        final AbortLocalTransactionRequest request = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
+        testForwardToLocal(request, AbortLocalTransactionRequest.class);
+    }
+
+    @Test
+    public void testForwardToLocalPurge() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
+        testForwardToLocal(request, TransactionPurgeRequest.class);
+    }
+
+    protected <T extends TransactionRequest> T testForwardToLocal(final TransactionRequest toForward,
+                                                                  final Class<T> expectedMessageClass) {
+        final Consumer<Response<?, ?>> callback = createCallbackMock();
+        final TransactionTester<LocalReadWriteProxyTransaction> transactionTester = createLocalProxy();
+        final LocalReadWriteProxyTransaction successor = transactionTester.getTransaction();
+        transaction.forwardToLocal(successor, toForward, callback);
+        return transactionTester.expectTransactionRequest(expectedMessageClass);
+    }
+
+    /**
+     * To emulate side effect of void method.
+     * {@link CursorAwareDataTreeModification#applyToCursor(DataTreeModificationCursor)}
+     *
+     * @param invocation invocation
+     * @return void - always null
+     */
+    protected Answer applyToCursorAnswer(final InvocationOnMock invocation) {
+        final DataTreeModificationCursor cursor =
+                invocation.getArgumentAt(0, DataTreeModificationCursor.class);
+        cursor.write(PATH_1.getLastPathArgument(), DATA_1);
+        cursor.merge(PATH_2.getLastPathArgument(), DATA_2);
+        cursor.delete(PATH_3.getLastPathArgument());
+        return null;
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransactionTest.java
new file mode 100644 (file)
index 0000000..68e106a
--- /dev/null
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.mockito.Mockito.when;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
+
+import akka.testkit.TestProbe;
+import com.google.common.base.VerifyException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+public class LocalReadOnlyProxyTransactionTest extends LocalProxyTransactionTest<LocalReadOnlyProxyTransaction> {
+
+    private DataTreeSnapshot snapshot;
+
+    @Override
+    protected LocalReadOnlyProxyTransaction createTransaction(final ProxyHistory parent,
+                                                              final TransactionIdentifier id,
+                                                              final DataTreeSnapshot snapshot) {
+        when(snapshot.readNode(PATH_1)).thenReturn(com.google.common.base.Optional.of(DATA_1));
+        when(snapshot.readNode(PATH_3)).thenReturn(com.google.common.base.Optional.absent());
+        this.snapshot = snapshot;
+        return new LocalReadOnlyProxyTransaction(parent, id, this.snapshot);
+    }
+
+    @Test
+    public void testIsSnapshotOnly() {
+        Assert.assertTrue(transaction.isSnapshotOnly());
+    }
+
+    @Test
+    public void testReadOnlyView() {
+        Assert.assertEquals(snapshot, transaction.readOnlyView());
+    }
+
+    @Override
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDirectCommit() throws Exception {
+        transaction.directCommit();
+    }
+
+    @Override
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCanCommit() throws Exception {
+        transaction.canCommit(new VotingFuture<>(new Object(), 1));
+    }
+
+    @Override
+    @Test(expected = UnsupportedOperationException.class)
+    public void testPreCommit() throws Exception {
+        transaction.preCommit(new VotingFuture<>(new Object(), 1));
+    }
+
+    @Override
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDoCommit() throws Exception {
+        transaction.doCommit(new VotingFuture<>(new Object(), 1));
+    }
+
+    @Override
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDelete() {
+        transaction.delete(PATH_1);
+    }
+
+    @Override
+    @Test(expected = UnsupportedOperationException.class)
+    public void testMerge() {
+        transaction.merge(PATH_1, DATA_1);
+    }
+
+    @Override
+    @Test(expected = UnsupportedOperationException.class)
+    public void testWrite() {
+        transaction.write(PATH_1, DATA_1);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDoDelete() {
+        transaction.doDelete(PATH_1);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDoMerge() {
+        transaction.doMerge(PATH_1, DATA_1);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDoWrite() {
+        transaction.doWrite(PATH_1, DATA_1);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCommitRequest() {
+        transaction.commitRequest(true);
+    }
+
+    @Test
+    public void testApplyModifyTransactionRequest() throws Exception {
+        final TestProbe probe = createProbe();
+        final ModifyTransactionRequestBuilder builder =
+                new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+        builder.setSequence(0);
+        builder.setAbort();
+        final ModifyTransactionRequest request = builder.build();
+        transaction.applyModifyTransactionRequest(request, createCallbackMock());
+        getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
+    }
+
+    @Test
+    public void testApplyModifyTransactionRequestNotAbort() throws Exception {
+        final TestProbe probe = createProbe();
+        final ModifyTransactionRequestBuilder builder =
+                new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+        builder.setSequence(0);
+        builder.setReady();
+        final ModifyTransactionRequest request = builder.build();
+        assertOperationThrowsException(() -> transaction.applyModifyTransactionRequest(request, createCallbackMock()),
+                VerifyException.class);
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java
new file mode 100644 (file)
index 0000000..4b07f3a
--- /dev/null
@@ -0,0 +1,248 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
+
+import akka.testkit.TestProbe;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.function.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTest<LocalReadWriteProxyTransaction> {
+    @Mock
+    private CursorAwareDataTreeModification modification;
+
+    @Override
+    protected LocalReadWriteProxyTransaction createTransaction(final ProxyHistory parent,
+                                                               final TransactionIdentifier id,
+                                                               final DataTreeSnapshot snapshot) {
+        when(snapshot.newModification()).thenReturn(modification);
+        when(modification.readNode(PATH_1)).thenReturn(com.google.common.base.Optional.of(DATA_1));
+        when(modification.readNode(PATH_3)).thenReturn(com.google.common.base.Optional.absent());
+        return new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
+    }
+
+    @Test
+    public void testIsSnapshotOnly() throws Exception {
+        Assert.assertFalse(transaction.isSnapshotOnly());
+    }
+
+    @Test
+    public void testReadOnlyView() throws Exception {
+        Assert.assertEquals(modification, transaction.readOnlyView());
+    }
+
+    @Test
+    @Override
+    public void testDelete() throws Exception {
+        transaction.delete(PATH_1);
+        verify(modification).delete(PATH_1);
+    }
+
+    @Test
+    @Override
+    public void testDirectCommit() throws Exception {
+        transaction.seal();
+        final ListenableFuture<Boolean> result = transaction.directCommit();
+        final TransactionTester<LocalReadWriteProxyTransaction> tester = getTester();
+        final CommitLocalTransactionRequest req = tester.expectTransactionRequest(CommitLocalTransactionRequest.class);
+        tester.replySuccess(new TransactionCommitSuccess(TRANSACTION_ID, req.getSequence()));
+        assertFutureEquals(true, result);
+    }
+
+    @Test
+    @Override
+    public void testCanCommit() throws Exception {
+        testRequestResponse(transaction::canCommit, CommitLocalTransactionRequest.class,
+                TransactionCanCommitSuccess::new);
+    }
+
+    @Test
+    @Override
+    public void testPreCommit() throws Exception {
+        testRequestResponse(transaction::preCommit, TransactionPreCommitRequest.class,
+                TransactionPreCommitSuccess::new);
+    }
+
+    @Test
+    @Override
+    public void testDoCommit() throws Exception {
+        testRequestResponse(transaction::doCommit, TransactionDoCommitRequest.class, TransactionCommitSuccess::new);
+    }
+
+    @Test
+    @Override
+    public void testMerge() throws Exception {
+        transaction.merge(PATH_1, DATA_1);
+        verify(modification).merge(PATH_1, DATA_1);
+    }
+
+    @Test
+    @Override
+    public void testWrite() throws Exception {
+        transaction.write(PATH_1, DATA_1);
+        verify(modification).write(PATH_1, DATA_1);
+    }
+
+    @Test
+    public void testCommitRequest() throws Exception {
+        transaction.doWrite(PATH_1, DATA_1);
+        final boolean coordinated = true;
+        final CommitLocalTransactionRequest request = transaction.commitRequest(coordinated);
+        Assert.assertEquals(coordinated, request.isCoordinated());
+        Assert.assertEquals(modification, request.getModification());
+    }
+
+    @Test
+    public void testModifyAfterCommitRequest() throws Exception {
+        transaction.doWrite(PATH_1, DATA_1);
+        final boolean coordinated = true;
+        transaction.commitRequest(coordinated);
+        assertOperationThrowsException(() -> transaction.doMerge(PATH_1, DATA_1), IllegalStateException.class);
+    }
+
+    @Test
+    public void testDoSeal() throws Exception {
+        assertOperationThrowsException(() -> transaction.getSnapshot(), IllegalStateException.class);
+        transaction.doSeal();
+        Assert.assertEquals(modification, transaction.getSnapshot());
+    }
+
+    @Test
+    public void testFlushState() throws Exception {
+        final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
+        final RemoteProxyTransaction successor = transactionTester.getTransaction();
+        doAnswer(this::applyToCursorAnswer).when(modification).applyToCursor(any());
+        transaction.doSeal();
+        transaction.flushState(successor);
+        verify(modification).applyToCursor(any());
+        transactionTester.getTransaction().seal();
+        transactionTester.getTransaction().directCommit();
+        final ModifyTransactionRequest modifyRequest =
+                transactionTester.expectTransactionRequest(ModifyTransactionRequest.class);
+        checkModifications(modifyRequest);
+    }
+
+    @Test
+    public void testApplyModifyTransactionRequestCoordinated() throws Exception {
+        applyModifyTransactionRequest(true);
+    }
+
+    @Test
+    public void testApplyModifyTransactionRequestSimple() throws Exception {
+        applyModifyTransactionRequest(false);
+    }
+
+    @Test
+    public void testApplyModifyTransactionRequestAbort() throws Exception {
+        final TestProbe probe = createProbe();
+        final ModifyTransactionRequestBuilder builder =
+                new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+        builder.setSequence(0L);
+        builder.setAbort();
+        final ModifyTransactionRequest request = builder.build();
+        final Consumer<Response<?, ?>> callback = createCallbackMock();
+        transaction.applyModifyTransactionRequest(request, callback);
+        getTester().expectTransactionRequest(TransactionAbortRequest.class);
+    }
+
+    @Test
+    public void testHandleForwardedRemotePreCommitRequest() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionPreCommitRequest request =
+                new TransactionPreCommitRequest(TRANSACTION_ID, 0L, probe.ref());
+        testHandleForwardedRemoteRequest(request);
+    }
+
+    @Test
+    public void testHandleForwardedRemoteDoCommitRequest() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionDoCommitRequest request =
+                new TransactionDoCommitRequest(TRANSACTION_ID, 0L, probe.ref());
+        testHandleForwardedRemoteRequest(request);
+    }
+
+    @Test
+    public void testHandleForwardedRemoteAbortRequest() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionAbortRequest request =
+                new TransactionAbortRequest(TRANSACTION_ID, 0L, probe.ref());
+        testHandleForwardedRemoteRequest(request);
+    }
+
+    @Test
+    public void testForwardToLocalCommit() throws Exception {
+        final TestProbe probe = createProbe();
+        final DataTreeModification mod = mock(DataTreeModification.class);
+        final TransactionRequest<?> request =
+                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), mod, false);
+        testForwardToLocal(request, CommitLocalTransactionRequest.class);
+    }
+
+    @Test
+    public void testSendAbort() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionRequest<?> request = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
+        transaction.sendAbort(request, createCallbackMock());
+        assertOperationThrowsException(() -> transaction.delete(PATH_1), IllegalStateException.class);
+    }
+
+    private void applyModifyTransactionRequest(final boolean coordinated) {
+        final TestProbe probe = createProbe();
+        final ModifyTransactionRequestBuilder builder =
+                new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+        final TransactionModification write = new TransactionWrite(PATH_1, DATA_1);
+        final TransactionModification merge = new TransactionMerge(PATH_2, DATA_2);
+        final TransactionModification delete = new TransactionDelete(PATH_3);
+        builder.addModification(write);
+        builder.addModification(merge);
+        builder.addModification(delete);
+        builder.setSequence(0L);
+        builder.setCommit(coordinated);
+        final ModifyTransactionRequest request = builder.build();
+        final Consumer<Response<?, ?>> callback = createCallbackMock();
+        transaction.applyModifyTransactionRequest(request, callback);
+        verify(modification).write(PATH_1, DATA_1);
+        verify(modification).merge(PATH_2, DATA_2);
+        verify(modification).delete(PATH_3);
+        final CommitLocalTransactionRequest commitRequest =
+                getTester().expectTransactionRequest(CommitLocalTransactionRequest.class);
+        Assert.assertEquals(modification, commitRequest.getModification());
+        Assert.assertEquals(coordinated, commitRequest.isCoordinated());
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransactionTest.java
new file mode 100644 (file)
index 0000000..9d6122f
--- /dev/null
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.hamcrest.CoreMatchers.both;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
+
+import akka.testkit.TestProbe;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+public class RemoteProxyTransactionTest extends AbstractProxyTransactionTest<RemoteProxyTransaction> {
+
+    @Override
+    protected RemoteProxyTransaction createTransaction(final ProxyHistory parent, final TransactionIdentifier id,
+                                                       final DataTreeSnapshot snapshot) {
+        return new RemoteProxyTransaction(parent, TRANSACTION_ID, false, false);
+    }
+
+    @Override
+    @Test
+    public void testExists() throws Exception {
+        final TransactionTester<RemoteProxyTransaction> tester = getTester();
+        final CheckedFuture<Boolean, ReadFailedException> exists = transaction.exists(PATH_1);
+        final ExistsTransactionRequest req = tester.expectTransactionRequest(ExistsTransactionRequest.class);
+        final boolean existsResult = true;
+        tester.replySuccess(new ExistsTransactionSuccess(TRANSACTION_ID, req.getSequence(), existsResult));
+        assertFutureEquals(existsResult, exists);
+    }
+
+    @Override
+    @Test
+    public void testRead() throws Exception {
+        final TransactionTester<RemoteProxyTransaction> tester = getTester();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = transaction.read(PATH_2);
+        final ReadTransactionRequest req = tester.expectTransactionRequest(ReadTransactionRequest.class);
+        final Optional<NormalizedNode<?, ?>> result = Optional.of(DATA_1);
+        tester.replySuccess(new ReadTransactionSuccess(TRANSACTION_ID, req.getSequence(), result));
+        assertFutureEquals(result, read);
+    }
+
+    @Override
+    @Test
+    public void testWrite() throws Exception {
+        final YangInstanceIdentifier path = PATH_1;
+        testModification(() -> transaction.write(path, DATA_1), TransactionWrite.class, path);
+    }
+
+    @Override
+    @Test
+    public void testMerge() throws Exception {
+        final YangInstanceIdentifier path = PATH_2;
+        testModification(() -> transaction.merge(path, DATA_2), TransactionMerge.class, path);
+    }
+
+    @Override
+    @Test
+    public void testDelete() throws Exception {
+        final YangInstanceIdentifier path = PATH_3;
+        testModification(() -> transaction.delete(path), TransactionDelete.class, path);
+    }
+
+    @Override
+    @Test
+    public void testDirectCommit() throws Exception {
+        transaction.seal();
+        final ListenableFuture<Boolean> result = transaction.directCommit();
+        final TransactionTester<RemoteProxyTransaction> tester = getTester();
+        final ModifyTransactionRequest req = tester.expectTransactionRequest(ModifyTransactionRequest.class);
+        Assert.assertTrue(req.getPersistenceProtocol().isPresent());
+        Assert.assertEquals(PersistenceProtocol.SIMPLE, req.getPersistenceProtocol().get());
+        tester.replySuccess(new TransactionCommitSuccess(TRANSACTION_ID, req.getSequence()));
+        assertFutureEquals(true, result);
+    }
+
+    @Override
+    @Test
+    public void testCanCommit() throws Exception {
+        testRequestResponse(transaction::canCommit, ModifyTransactionRequest.class,
+                TransactionCanCommitSuccess::new);
+    }
+
+    @Override
+    @Test
+    public void testPreCommit() throws Exception {
+        testRequestResponse(transaction::preCommit, TransactionPreCommitRequest.class,
+                TransactionPreCommitSuccess::new);
+    }
+
+    @Override
+    @Test
+    public void testDoCommit() throws Exception {
+        testRequestResponse(transaction::doCommit, TransactionDoCommitRequest.class, TransactionCommitSuccess::new);
+    }
+
+    @Override
+    @Test
+    public void testForwardToRemoteAbort() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionAbortRequest request = new TransactionAbortRequest(TRANSACTION_ID, 0L, probe.ref());
+        testForwardToRemote(request, TransactionAbortRequest.class);
+
+    }
+
+    @Override
+    public void testForwardToRemoteCommit() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionAbortRequest request = new TransactionAbortRequest(TRANSACTION_ID, 0L, probe.ref());
+        testForwardToRemote(request, TransactionAbortRequest.class);
+    }
+
+    @Test
+    public void testForwardToRemoteModifyCommitSimple() throws Exception {
+        final TestProbe probe = createProbe();
+        final ModifyTransactionRequestBuilder builder =
+                new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+        builder.setSequence(0L);
+        builder.setCommit(false);
+        final ModifyTransactionRequest request = builder.build();
+        final ModifyTransactionRequest received = testForwardToRemote(request, ModifyTransactionRequest.class);
+        Assert.assertEquals(request.getPersistenceProtocol(), received.getPersistenceProtocol());
+        Assert.assertEquals(request.getModifications(), received.getModifications());
+        Assert.assertEquals(request.getTarget(), received.getTarget());
+    }
+
+    @Test
+    public void testForwardToRemoteModifyCommit3Phase() throws Exception {
+        final TestProbe probe = createProbe();
+        final ModifyTransactionRequestBuilder builder =
+                new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+        builder.setSequence(0L);
+        builder.setCommit(true);
+        final ModifyTransactionRequest request = builder.build();
+        final ModifyTransactionRequest received = testForwardToRemote(request, ModifyTransactionRequest.class);
+        Assert.assertEquals(request.getPersistenceProtocol(), received.getPersistenceProtocol());
+        Assert.assertEquals(request.getModifications(), received.getModifications());
+        Assert.assertEquals(request.getTarget(), received.getTarget());
+    }
+
+    @Test
+    public void testForwardToRemoteModifyAbort() throws Exception {
+        final TestProbe probe = createProbe();
+        final ModifyTransactionRequestBuilder builder =
+                new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+        builder.setSequence(0L);
+        builder.setAbort();
+        final ModifyTransactionRequest request = builder.build();
+        final TransactionAbortRequest received = testForwardToRemote(request, TransactionAbortRequest.class);
+        Assert.assertEquals(request.getTarget(), received.getTarget());
+    }
+
+    @Test
+    public void testForwardToRemoteModifyRead() throws Exception {
+        final TestProbe probe = createProbe();
+        final ReadTransactionRequest request =
+                new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, false);
+        final ReadTransactionRequest received = testForwardToRemote(request, ReadTransactionRequest.class);
+        Assert.assertEquals(request.getTarget(), received.getTarget());
+        Assert.assertEquals(request.getPath(), received.getPath());
+    }
+
+    @Test
+    public void testForwardToRemoteModifyExists() throws Exception {
+        final TestProbe probe = createProbe();
+        final ExistsTransactionRequest request =
+                new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, false);
+        final ExistsTransactionRequest received = testForwardToRemote(request, ExistsTransactionRequest.class);
+        Assert.assertEquals(request.getTarget(), received.getTarget());
+        Assert.assertEquals(request.getPath(), received.getPath());
+    }
+
+    @Test
+    public void testForwardToRemoteModifyPreCommit() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionPreCommitRequest request =
+                new TransactionPreCommitRequest(TRANSACTION_ID, 0L, probe.ref());
+        final TransactionPreCommitRequest received = testForwardToRemote(request, TransactionPreCommitRequest.class);
+        Assert.assertEquals(request.getTarget(), received.getTarget());
+    }
+
+    @Test
+    public void testForwardToRemoteModifyDoCommit() throws Exception {
+        final TestProbe probe = createProbe();
+        final TransactionDoCommitRequest request =
+                new TransactionDoCommitRequest(TRANSACTION_ID, 0L, probe.ref());
+        final TransactionDoCommitRequest received = testForwardToRemote(request, TransactionDoCommitRequest.class);
+        Assert.assertEquals(request.getTarget(), received.getTarget());
+    }
+
+
+    private <T extends TransactionModification> void testModification(final Runnable modification,
+                                                                      final Class<T> cls,
+                                                                      final YangInstanceIdentifier expectedPath) {
+        modification.run();
+        final ModifyTransactionRequest request = transaction.commitRequest(false);
+        final List<TransactionModification> modifications = request.getModifications();
+        Assert.assertEquals(1, modifications.size());
+        Assert.assertThat(modifications, hasItem(both(isA(cls)).and(hasPath(expectedPath))));
+    }
+
+}
\ No newline at end of file
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
@@ -27,22 +28,22 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 /**
  * Helper class. Allows checking messages received by backend and respond to them.
  */
-class TranasactionTester {
+class TransactionTester<T extends AbstractProxyTransaction> {
 
-    private final RemoteProxyTransaction transaction;
+    private final T transaction;
     private final AbstractClientConnection<ShardBackendInfo> connection;
     private final TestProbe backendProbe;
     private RequestEnvelope envelope;
 
-    TranasactionTester(final RemoteProxyTransaction transaction,
-                       final AbstractClientConnection<ShardBackendInfo> connection,
-                       final TestProbe backendProbe) {
+    TransactionTester(final T transaction,
+                      final AbstractClientConnection<ShardBackendInfo> connection,
+                      final TestProbe backendProbe) {
         this.transaction = transaction;
         this.connection = connection;
         this.backendProbe = backendProbe;
     }
 
-    RemoteProxyTransaction getTransaction() {
+    T getTransaction() {
         return transaction;
     }
 
@@ -50,10 +51,12 @@ class TranasactionTester {
         return (TransactionRequest) envelope.getMessage();
     }
 
-    <T extends TransactionRequest> T expectTransactionRequest(final Class<T> expected) {
+    <R extends TransactionRequest> R expectTransactionRequest(final Class<R> expected) {
         envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
-        Assert.assertTrue(expected.isAssignableFrom(envelope.getMessage().getClass()));
-        return (T) envelope.getMessage();
+        final Class<? extends Request> actual = envelope.getMessage().getClass();
+        final String errorMsg = String.format("Expected instance of %s, received %s", expected, actual);
+        Assert.assertTrue(errorMsg, expected.isAssignableFrom(actual));
+        return (R) envelope.getMessage();
     }
 
     void replySuccess(final RequestSuccess<?, ?> success) {