Add AbstractTransactionCommitCohort unit tests 98/53398/9
authorAndrej Mak <andrej.mak@pantheon.tech>
Thu, 16 Mar 2017 12:24:32 +0000 (13:24 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 20 Mar 2017 10:21:02 +0000 (10:21 +0000)
Change-Id: I18036259e022bfb3d027c82757a8c840cebb8ded
Signed-off-by: Andrej Mak <andrej.mak@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehaviorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohortTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TestUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TranasactionTester.java [new file with mode: 0644]

index 4f6625b6c5d0da0fb5d812b0b2e5b2b76e37f373..9929b402ca94e8d1a3a924126198bfd4e7e59338 100644 (file)
@@ -22,6 +22,11 @@ public class AccessClientUtil {
         return new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id);
     }
 
         return new ClientActorContext(actor, system.scheduler(), system.dispatcher(), persistenceId, id);
     }
 
+    public static <T extends BackendInfo> ConnectedClientConnection<T> createConnectedConnection(
+            final ClientActorContext context, final Long cookie, final T backend) {
+        return new ConnectedClientConnection<>(context, cookie, backend);
+    }
+
     public static void completeRequest(final AbstractClientConnection<? extends BackendInfo> connection,
                                        final ResponseEnvelope<?> envelope) {
         connection.receiveResponse(envelope);
     public static void completeRequest(final AbstractClientConnection<? extends BackendInfo> connection,
                                        final ResponseEnvelope<?> envelope) {
         connection.receiveResponse(envelope);
index 95791788b1d1bb4473e866beafe8fd5ecff2b54a..3dfaac50b2b7eaedb6eac84e6784f052013ffe7c 100644 (file)
@@ -11,6 +11,9 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -33,20 +36,14 @@ import org.opendaylight.controller.cluster.access.client.InternalCommand;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Envelope;
 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.Envelope;
 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendType;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -56,14 +53,8 @@ import scala.concurrent.Promise;
 
 public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<AbstractProxyTransaction>> {
 
 
 public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<AbstractProxyTransaction>> {
 
-    private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
-    private static final FrontendType FRONTEND_TYPE = FrontendType.forName("type-1");
-    private static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
-    private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
-    private static final LocalHistoryIdentifier HISTORY_ID = new LocalHistoryIdentifier(CLIENT_ID, 0L);
     private static final String PERSISTENCE_ID = "per-1";
     private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
     private static final String PERSISTENCE_ID = "per-1";
     private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
-    protected static final TransactionIdentifier TRANSACTION_ID = new TransactionIdentifier(HISTORY_ID, 0L);
 
     @Mock
     private DataTree dataTree;
 
     @Mock
     private DataTree dataTree;
index 7ababb8ca894cc50c9f4b93e9a72b46b08bbad6d..618933e0685cee1fc5a7830aaae4ab5e8f3bef54 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -28,10 +29,6 @@ import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.InternalCommand;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
 import org.opendaylight.controller.cluster.access.client.InternalCommand;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendType;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -43,10 +40,6 @@ import scala.concurrent.Promise;
 public abstract class AbstractDataStoreClientBehaviorTest {
 
     protected static final String SHARD = "default";
 public abstract class AbstractDataStoreClientBehaviorTest {
 
     protected static final String SHARD = "default";
-    private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
-    private static final FrontendType FRONTEND_TYPE = FrontendType.forName("type-1");
-    private static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
-    private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
     private static final String PERSISTENCE_ID = "per-1";
 
     private ActorSystem system;
     private static final String PERSISTENCE_ID = "per-1";
 
     private ActorSystem system;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java
new file mode 100644 (file)
index 0000000..9497157
--- /dev/null
@@ -0,0 +1,240 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+
+public class ClientTransactionCommitCohortTest {
+
+    private static final String PERSISTENCE_ID = "per-1";
+    private static final int TRANSACTIONS = 3;
+
+    @Mock
+    private AbstractClientHistory history;
+    private ActorSystem system;
+    private List<TranasactionTester> transactions;
+    private ClientTransactionCommitCohort cohort;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        system = ActorSystem.apply();
+        final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
+        final ClientActorContext context =
+                AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+        transactions = new ArrayList<>();
+        for (int i = 0; i < TRANSACTIONS; i++) {
+            transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
+        }
+        final Collection<AbstractProxyTransaction> proxies = transactions.stream()
+                .map(TranasactionTester::getTransaction)
+                .collect(Collectors.toList());
+        proxies.forEach(AbstractProxyTransaction::seal);
+        cohort = new ClientTransactionCommitCohort(history, TRANSACTION_ID, proxies);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system);
+    }
+
+    @Test
+    public void testCanCommit() throws Exception {
+        testOpSuccess(ClientTransactionCommitCohort::canCommit, this::expectCanCommit,
+                this::replyCanCommitSuccess, true);
+    }
+
+    @Test
+    public void testCanCommitFail() throws Exception {
+        testOpFail(ClientTransactionCommitCohort::canCommit, this::expectCanCommit, this::replyCanCommitSuccess);
+    }
+
+    @Test
+    public void testPreCommit() throws Exception {
+        testOpSuccess(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess,
+                null);
+    }
+
+    @Test
+    public void testPreCommitFail() throws Exception {
+        testOpFail(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess);
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        testOpSuccess(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess, null);
+    }
+
+    @Test
+    public void testCommitFail() throws Exception {
+        testOpFail(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess);
+    }
+
+    @Test
+    public void testAbort() throws Exception {
+        testOpSuccess(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess, null);
+    }
+
+    @Test
+    public void testAbortFail() throws Exception {
+        testOpFail(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess);
+    }
+
+    private void expectCanCommit(final TranasactionTester tester) {
+        final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class);
+        Assert.assertTrue(request.getPersistenceProtocol().isPresent());
+        Assert.assertEquals(PersistenceProtocol.THREE_PHASE, request.getPersistenceProtocol().get());
+    }
+
+    void expectPreCommit(final TranasactionTester tester) {
+        tester.expectTransactionRequest(TransactionPreCommitRequest.class);
+    }
+
+    void expectCommit(final TranasactionTester tester) {
+        tester.expectTransactionRequest(TransactionDoCommitRequest.class);
+    }
+
+    void expectAbort(final TranasactionTester tester) {
+        tester.expectTransactionRequest(TransactionAbortRequest.class);
+    }
+
+    void replyCanCommitSuccess(final TranasactionTester tester) {
+        final RequestSuccess<?, ?> success = new TransactionCanCommitSuccess(tester.getTransaction().getIdentifier(),
+                tester.getLastReceivedMessage().getSequence());
+        tester.replySuccess(success);
+    }
+
+    void replyPreCommitSuccess(final TranasactionTester tester) {
+        final RequestSuccess<?, ?> success = new TransactionPreCommitSuccess(tester.getTransaction().getIdentifier(),
+                tester.getLastReceivedMessage().getSequence());
+        tester.replySuccess(success);
+    }
+
+    void replyCommitSuccess(final TranasactionTester tester) {
+        final RequestSuccess<?, ?> success = new TransactionCommitSuccess(tester.getTransaction().getIdentifier(),
+                tester.getLastReceivedMessage().getSequence());
+        tester.replySuccess(success);
+    }
+
+    void replyAbortSuccess(final TranasactionTester tester) {
+        final RequestSuccess<?, ?> success = new TransactionAbortSuccess(tester.getTransaction().getIdentifier(),
+                tester.getLastReceivedMessage().getSequence());
+        tester.replySuccess(success);
+    }
+
+    private static TranasactionTester createTransactionTester(final TestProbe backendProbe,
+                                                              final ClientActorContext context,
+                                                              final AbstractClientHistory history) {
+        final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
+                "default", UnsignedLong.ZERO, Optional.empty(), 3);
+        final AbstractClientConnection<ShardBackendInfo> connection =
+                AccessClientUtil.createConnectedConnection(context, 0L, backend);
+        final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
+        final RemoteProxyTransaction transaction =
+                new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+        return new TranasactionTester(transaction, connection, backendProbe);
+    }
+
+    private void replySuccess(final Collection<TranasactionTester> transactions,
+                              final Consumer<TranasactionTester> expect,
+                              final Consumer<TranasactionTester> reply) {
+        for (final TranasactionTester transaction : transactions) {
+            expect.accept(transaction);
+            reply.accept(transaction);
+        }
+    }
+
+    /**
+     * Test operation success. Invokes given operation, which initiates message to the backend.
+     * Received message is checked by expectFunction. Then replyFunction is invoked. Expected result is compared
+     * to the operation future result.
+     *
+     * @param operation      operation
+     * @param expectFunction expected message check
+     * @param replyFunction  response function
+     * @param expectedResult expected operation result
+     * @param <T>            type
+     * @throws Exception unexpected exception
+     */
+    private <T> void testOpSuccess(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
+                                   final Consumer<TranasactionTester> expectFunction,
+                                   final Consumer<TranasactionTester> replyFunction,
+                                   final T expectedResult) throws Exception {
+        final ListenableFuture<T> result = operation.apply(cohort);
+        replySuccess(transactions, expectFunction, replyFunction);
+        Assert.assertEquals(expectedResult, getWithTimeout(result));
+    }
+
+    /**
+     * Test operation failure. Invokes given operation, which initiates message to the backend.
+     * Received message is checked by expectFunction. Then replyFunction is invoked. One of the transactions in
+     * cohort receives failure response.
+     *
+     * @param operation      operation
+     * @param expectFunction expected message check
+     * @param replyFunction  response function
+     * @param <T>            type
+     * @throws Exception unexpected exception
+     */
+    private <T> void testOpFail(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
+                                final Consumer<TranasactionTester> expectFunction,
+                                final Consumer<TranasactionTester> replyFunction) throws Exception {
+        final ListenableFuture<T> canCommit = operation.apply(cohort);
+        //reply success to all except last transaction
+        replySuccess(transactions.subList(0, transactions.size() - 1), expectFunction, replyFunction);
+        //reply fail to last transaction
+        final TranasactionTester last = transactions.get(transactions.size() - 1);
+        expectFunction.accept(last);
+        final RuntimeRequestException cause = new RuntimeRequestException("fail", new RuntimeException());
+        last.replyFailure(cause);
+        //check future fail
+        final ExecutionException exception =
+                assertOperationThrowsException(() -> getWithTimeout(canCommit), ExecutionException.class);
+        Assert.assertEquals(cause, exception.getCause());
+    }
+
+}
\ No newline at end of file
index 7b10cefbe1890ef9be6e5234058da38d56bc2f97..5834833e688f33b26698b6159da62c6504ae1109 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java
new file mode 100644 (file)
index 0000000..9c92afb
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Optional;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+
+public class DirectTransactionCommitCohortTest {
+
+    private static final String PERSISTENCE_ID = "per-1";
+
+    @Mock
+    private AbstractClientHistory history;
+    private ActorSystem system;
+    private TranasactionTester transaction;
+    private DirectTransactionCommitCohort cohort;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        system = ActorSystem.apply();
+        final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
+        final ClientActorContext context =
+                AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+        transaction = createTransactionTester(new TestProbe(system, "backend"), context, history);
+        final AbstractProxyTransaction proxy = transaction.getTransaction();
+        proxy.seal();
+        cohort = new DirectTransactionCommitCohort(history, TRANSACTION_ID, proxy);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system);
+    }
+
+    @Test
+    public void testCanCommit() throws Exception {
+        final ListenableFuture<Boolean> canCommit = cohort.canCommit();
+        final ModifyTransactionRequest request = transaction.expectTransactionRequest(ModifyTransactionRequest.class);
+        Assert.assertTrue(request.getPersistenceProtocol().isPresent());
+        Assert.assertEquals(PersistenceProtocol.SIMPLE, request.getPersistenceProtocol().get());
+        final RequestSuccess<?, ?> success = new TransactionCommitSuccess(transaction.getTransaction().getIdentifier(),
+                transaction.getLastReceivedMessage().getSequence());
+        transaction.replySuccess(success);
+        Assert.assertTrue(getWithTimeout(canCommit));
+    }
+
+    @Test
+    public void testPreCommit() throws Exception {
+        final ListenableFuture<Void> preCommit = cohort.preCommit();
+        Assert.assertNull(getWithTimeout(preCommit));
+    }
+
+    @Test
+    public void testAbort() throws Exception {
+        final ListenableFuture<Void> abort = cohort.abort();
+        verify(history).onTransactionComplete(transaction.getTransaction().getIdentifier());
+        Assert.assertNull(getWithTimeout(abort));
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        final ListenableFuture<Void> commit = cohort.commit();
+        verify(history).onTransactionComplete(transaction.getTransaction().getIdentifier());
+        Assert.assertNull(getWithTimeout(commit));
+    }
+
+    private static TranasactionTester createTransactionTester(final TestProbe backendProbe,
+                                                              final ClientActorContext context,
+                                                              final AbstractClientHistory history) {
+        final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
+                "default", UnsignedLong.ZERO, Optional.empty(), 3);
+        final AbstractClientConnection<ShardBackendInfo> connection =
+                AccessClientUtil.createConnectedConnection(context, 0L, backend);
+        final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
+        final RemoteProxyTransaction transaction =
+                new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+        return new TranasactionTester(transaction, connection, backendProbe);
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohortTest.java
new file mode 100644 (file)
index 0000000..024f88f
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class EmptyTransactionCommitCohortTest {
+
+    @Mock
+    private AbstractClientHistory history;
+
+    private EmptyTransactionCommitCohort cohort;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        cohort = new EmptyTransactionCommitCohort(history, TRANSACTION_ID);
+    }
+
+    @Test
+    public void testCanCommit() throws Exception {
+        final ListenableFuture<Boolean> canCommit = cohort.canCommit();
+        Assert.assertTrue(getWithTimeout(canCommit));
+    }
+
+    @Test
+    public void testPreCommit() throws Exception {
+        final ListenableFuture<Void> preCommit = cohort.preCommit();
+        Assert.assertNull(getWithTimeout(preCommit));
+    }
+
+    @Test
+    public void testAbort() throws Exception {
+        final ListenableFuture<Void> abort = cohort.abort();
+        verify(history).onTransactionComplete(TRANSACTION_ID);
+        Assert.assertNull(getWithTimeout(abort));
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        final ListenableFuture<Void> commit = cohort.commit();
+        verify(history).onTransactionComplete(TRANSACTION_ID);
+        Assert.assertNull(getWithTimeout(commit));
+    }
+
+}
\ No newline at end of file
index d713ba4d3caef7e505d5402ec50108d2512cda39..4c82357d93cfb5ddc99bcc1e0c7ccca898737428 100644 (file)
@@ -10,9 +10,22 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 class TestUtils {
 
 
 class TestUtils {
 
+    static final MemberName MEMBER_NAME = MemberName.forName("member-1");
+    static final FrontendType FRONTEND_TYPE = FrontendType.forName("type-1");
+    static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
+    static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
+    static final LocalHistoryIdentifier HISTORY_ID = new LocalHistoryIdentifier(CLIENT_ID, 0L);
+    static final TransactionIdentifier TRANSACTION_ID = new TransactionIdentifier(HISTORY_ID, 0L);
+
     @FunctionalInterface
     public interface RunnableWithException {
         void run() throws Exception;
     @FunctionalInterface
     public interface RunnableWithException {
         void run() throws Exception;
@@ -56,9 +69,9 @@ class TestUtils {
      */
     //Throwable is propagated if doesn't match the expected type
     @SuppressWarnings("checkstyle:IllegalCatch")
      */
     //Throwable is propagated if doesn't match the expected type
     @SuppressWarnings("checkstyle:IllegalCatch")
-    static Throwable assertOperationThrowsException(final RunnableWithException operation,
-                                                    final Class<? extends Throwable> expectedException,
-                                                    final String message) throws Exception {
+    static <T extends Throwable> T assertOperationThrowsException(final RunnableWithException operation,
+                                                                  final Class<T> expectedException,
+                                                                  final String message) throws Exception {
         try {
             operation.run();
             throw new AssertionError(message + expectedException);
         try {
             operation.run();
             throw new AssertionError(message + expectedException);
@@ -66,7 +79,7 @@ class TestUtils {
             if (!e.getClass().equals(expectedException)) {
                 throw e;
             }
             if (!e.getClass().equals(expectedException)) {
                 throw e;
             }
-            return e;
+            return (T) e;
         }
     }
 
         }
     }
 
@@ -78,8 +91,8 @@ class TestUtils {
      * @return expected exception instance. Can be used for additional assertions.
      * @throws Exception unexpected exception.
      */
      * @return expected exception instance. Can be used for additional assertions.
      * @throws Exception unexpected exception.
      */
-    static Throwable assertOperationThrowsException(final RunnableWithException operation,
-                                                    final Class<? extends Throwable> expectedException)
+    static <T extends Throwable> T assertOperationThrowsException(final RunnableWithException operation,
+                                                                  final Class<T> expectedException)
             throws Exception {
         return assertOperationThrowsException(operation, expectedException, "Operation should throw exception: ");
     }
             throws Exception {
         return assertOperationThrowsException(operation, expectedException, "Operation should throw exception: ");
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TranasactionTester.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TranasactionTester.java
new file mode 100644 (file)
index 0000000..b9dc0f1
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.testkit.TestProbe;
+import javax.annotation.Nonnull;
+import org.junit.Assert;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
+import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
+import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Helper class. Allows checking messages received by backend and respond to them.
+ */
+class TranasactionTester {
+
+    private final RemoteProxyTransaction transaction;
+    private final AbstractClientConnection<ShardBackendInfo> connection;
+    private final TestProbe backendProbe;
+    private RequestEnvelope envelope;
+
+    TranasactionTester(final RemoteProxyTransaction transaction,
+                       final AbstractClientConnection<ShardBackendInfo> connection,
+                       final TestProbe backendProbe) {
+        this.transaction = transaction;
+        this.connection = connection;
+        this.backendProbe = backendProbe;
+    }
+
+    RemoteProxyTransaction getTransaction() {
+        return transaction;
+    }
+
+    TransactionRequest getLastReceivedMessage() {
+        return (TransactionRequest) envelope.getMessage();
+    }
+
+    <T extends TransactionRequest> T expectTransactionRequest(final Class<T> expected) {
+        envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
+        Assert.assertTrue(expected.isAssignableFrom(envelope.getMessage().getClass()));
+        return (T) envelope.getMessage();
+    }
+
+    void replySuccess(final RequestSuccess<?, ?> success) {
+        final long sessionId = envelope.getSessionId();
+        final long txSequence = envelope.getTxSequence();
+        final long executionTime = 0L;
+        final SuccessEnvelope responseEnvelope = new SuccessEnvelope(success, sessionId, txSequence, executionTime);
+        AccessClientUtil.completeRequest(connection, responseEnvelope);
+    }
+
+    void replyFailure(final RequestException cause) {
+        final long sessionId = envelope.getSessionId();
+        final long txSequence = envelope.getTxSequence();
+        final long executionTime = 0L;
+        final RequestFailure<?, ?> fail =
+                new MockFailure(transaction.getIdentifier(), envelope.getMessage().getSequence(), cause);
+        final FailureEnvelope responseEnvelope = new FailureEnvelope(fail, sessionId, txSequence, executionTime);
+        AccessClientUtil.completeRequest(connection, responseEnvelope);
+    }
+
+    private static class MockFailure extends RequestFailure<TransactionIdentifier, TransactionFailure> {
+        private MockFailure(@Nonnull final TransactionIdentifier target, final long sequence,
+                            @Nonnull final RequestException cause) {
+            super(target, sequence, cause);
+        }
+
+        @Nonnull
+        @Override
+        protected TransactionFailure cloneAsVersion(@Nonnull final ABIVersion targetVersion) {
+            throw new UnsupportedOperationException("Not implemented");
+        }
+
+        @Override
+        protected AbstractRequestFailureProxy<TransactionIdentifier, TransactionFailure> externalizableProxy(
+                @Nonnull final ABIVersion version) {
+            throw new UnsupportedOperationException("Not implemented");
+        }
+    }
+}