Add support for reusable streaming
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / test / java / org / opendaylight / controller / remote / rpc / RemoteRpcImplementationTest.java
index a70b90b07e58bf830cf972c415ded12e8983ebc7..5b3394eebd223dacf65df2616e65b8f98e9b5444 100644 (file)
@@ -11,54 +11,34 @@ package org.opendaylight.controller.remote.rpc;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
 
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.japi.Pair;
-import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import java.util.Arrays;
-import java.util.Collections;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMRpcException;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
-/***
+/**
  * Unit tests for RemoteRpcImplementation.
  *
  * @author Thomas Pantelis
  */
 public class RemoteRpcImplementationTest extends AbstractRpcTest {
 
-
-
-    @Test(expected = DOMRpcImplementationNotAvailableException.class)
-    public void testInvokeRpcWithNoRemoteActor() throws Exception {
-        final ContainerNode input = makeRPCInput("foo");
-        final CheckedFuture<DOMRpcResult, DOMRpcException> failedFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, input);
-        rpcRegistry1Probe.expectMsgClass(JavaTestKit.duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
-        rpcRegistry1Probe
-                .reply(new RpcRegistry.Messages.FindRoutersReply(Collections.<Pair<ActorRef, Long>>emptyList()));
-        failedFuture.checkedGet(5, TimeUnit.SECONDS);
-    }
-
-
     /**
-     * This test method invokes and executes the remote rpc
+     * This test method invokes and executes the remote rpc.
      */
     @Test
     public void testInvokeRpc() throws Exception {
@@ -71,25 +51,17 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
                 (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
 
         when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
-                Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+                FluentFutures.immediateFluentFuture(rpcResult));
 
-        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
-                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        final ListenableFuture<DOMRpcResult> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
-        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
-                rpcBroker2, 200L))));
 
-        final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+        final DOMRpcResult result = frontEndFuture.get(5, TimeUnit.SECONDS);
         assertEquals(rpcOutput, result.getResult());
     }
 
     /**
-     * This test method invokes and executes the remote rpc
+     * This test method invokes and executes the remote rpc.
      */
     @Test
     public void testInvokeRpcWithNullInput() throws Exception {
@@ -101,26 +73,17 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
                 (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
 
         when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
-                Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+                FluentFutures.immediateFluentFuture(rpcResult));
 
-        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
-                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, null);
+        ListenableFuture<DOMRpcResult> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, null);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
-        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
-                rpcBroker2, 200L))));
 
-        final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+        final DOMRpcResult result = frontEndFuture.get(5, TimeUnit.SECONDS);
         assertEquals(rpcOutput, result.getResult());
     }
 
-
     /**
-     * This test method invokes and executes the remote rpc
+     * This test method invokes and executes the remote rpc.
      */
     @Test
     public void testInvokeRpcWithNoOutput() throws Exception {
@@ -133,77 +96,51 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
                 (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
 
         when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
-                Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+                FluentFutures.immediateFluentFuture(rpcResult));
 
-        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
-                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        final ListenableFuture<DOMRpcResult> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
-        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
-                rpcBroker2, 200L))));
 
-        final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+        final DOMRpcResult result = frontEndFuture.get(5, TimeUnit.SECONDS);
         assertNull(result.getResult());
     }
 
-
     /**
-     * This test method invokes and executes the remote rpc
+     * This test method invokes and executes the remote rpc.
      */
+    @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
     @Test(expected = DOMRpcException.class)
-    public void testInvokeRpcWithRemoteFailedFuture() throws Exception {
-        final ContainerNode rpcOutput = null;
-        final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
-
+    public void testInvokeRpcWithRemoteFailedFuture() throws Throwable {
         final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
         @SuppressWarnings({"unchecked", "rawtypes"})
         final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
                 (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
 
         when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
-                Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcException(
-                        "Test Exception") {}));
+                FluentFutures.immediateFailedFluentFuture(new RemoteDOMRpcException("Test Exception", null)));
 
-        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
-                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        final ListenableFuture<DOMRpcResult> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
-        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
-                rpcBroker2, 200L))));
-        frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+
+        try {
+            frontEndFuture.get(5, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        }
     }
 
     /**
      * This test method invokes and tests exceptions when akka timeout occured
-     *
-     * Currently ignored since this test with current config takes around 15 seconds
-     * to complete.
-     *
+     * Currently ignored since this test with current config takes around 15 seconds to complete.
      */
     @Ignore
     @Test(expected = RemoteDOMRpcException.class)
     public void testInvokeRpcWithAkkaTimeoutException() throws Exception {
         final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
-                (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
-        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
-                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        final ListenableFuture<DOMRpcResult> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
 
-        frontEndFuture.checkedGet(20, TimeUnit.SECONDS);
+        frontEndFuture.get(20, TimeUnit.SECONDS);
     }
 
     /**
@@ -211,35 +148,20 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
      * with runtime exception.
      */
     @Test(expected = DOMRpcException.class)
-    public void testInvokeRpcWithLookupException() throws Exception {
+    @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
+    public void testInvokeRpcWithLookupException() throws Throwable {
         final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
-                (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
-        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
-                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
-        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-        rpcRegistry1Probe.reply( new Status.Failure(new RuntimeException("test")));
-        frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
-    }
-
-    /**
-     * This test method invokes and executes the remote rpc
-     */
-    @Test(expected = DOMRpcImplementationNotAvailableException.class)
-    public void testInvokeRpcWithLoopException() throws Exception {
-        final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(NormalizedNodeSerializer.serialize(makeRPCInput("foo")));
-        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
 
-        frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
-    }
+        doThrow(new RuntimeException("test")).when(domRpcService2).invokeRpc(any(SchemaPath.class),
+            any(NormalizedNode.class));
 
+        final ListenableFuture<DOMRpcResult> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
 
-    private RemoteRpcProviderConfig getConfig() {
-        return new RemoteRpcProviderConfig.Builder("unit-test").build();
+        try {
+            frontEndFuture.get(5, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        }
     }
 }