Removed uncessary calls to RpcBroker to find routes. 84/22584/1
authorTony Tkacik <ttkacik@cisco.com>
Thu, 11 Jun 2015 15:19:46 +0000 (17:19 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 15 Jun 2015 13:29:13 +0000 (13:29 +0000)
Moved invokeRpc part to RemoteRpcImplementation
which allowed to do ask for lookups in RemoteRpcImplementation.

This changed role of RpcBroker part to only delegate
to MD-SAL, if RPC was received via Akka.

remote.rpc.RpcBroker interaction model represented
multi-stepped pipeline which resulted in following
message pattern

RemoteRpcImplementaion ->
RpcBroker#InvokeRpc ->
RpcRegistry#FindRoutes ->
RpcBroker#ExecuteRpc

InvokeRpc only did lookup in FindRoutes and all
outgoing messages needed to pass via RpcBroker.
Unfortunatelly this also prevented lookup
of any RPC Path during executing RPC in MD-SAL.

Change-Id: I6e84bfcb74b71f7417c2d3f8c35a7f8b0406caf9
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
(cherry picked from commit 9216287a4d1fc310f81f1956685f4e6deb7eefa8)

13 files changed:
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java

index e97a499..c6b796d 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.remote.rpc;
 
 import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.CheckedFuture;
@@ -19,22 +20,38 @@ import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
+/**
+ * @author tony
+ *
+ */
 class RemoteDOMRpcFuture extends AbstractFuture<DOMRpcResult> implements CheckedFuture<DOMRpcResult, DOMRpcException> {
 
     private static final Logger LOG = LoggerFactory.getLogger(RemoteDOMRpcFuture.class);
 
-    private RemoteDOMRpcFuture(final Future<Object> future) {
-        future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global());
+    private final QName rpcName;
+
+    private RemoteDOMRpcFuture(final QName rpcName) {
+        this.rpcName = Preconditions.checkNotNull(rpcName,"rpcName");
     }
 
-    public static CheckedFuture<DOMRpcResult, DOMRpcException> from(final Future<Object> future) {
-        return new RemoteDOMRpcFuture(future);
+    public static RemoteDOMRpcFuture create(final QName rpcName) {
+        return new RemoteDOMRpcFuture(rpcName);
+    }
+
+    protected void failNow(final Throwable error) {
+        LOG.debug("Failing future {} for rpc {}", this, rpcName, error);
+        setException(error);
+    }
+
+    protected void completeWith(final Future<Object> future) {
+        future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global());
     }
 
     @Override
@@ -72,20 +89,21 @@ class RemoteDOMRpcFuture extends AbstractFuture<DOMRpcResult> implements Checked
         @Override
         public void onComplete(final Throwable error, final Object reply) throws Throwable {
             if (error != null) {
-                RemoteDOMRpcFuture.this.setException(error);
+                RemoteDOMRpcFuture.this.failNow(error);
             } else if (reply instanceof RpcResponse) {
                 final RpcResponse rpcReply = (RpcResponse) reply;
                 final NormalizedNode<?, ?> result;
                 if (rpcReply.getResultNormalizedNode() == null) {
                     result = null;
-                    LOG.debug("Received response for invoke rpc: result is null");
+                    LOG.debug("Received response for rpc {}: result is null", rpcName);
                 } else {
                     result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
-                    LOG.debug("Received response for invoke rpc: result is {}", result);
+                    LOG.debug("Received response for rpc {}: result is {}", rpcName, result);
                 }
                 RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result));
+                LOG.debug("Future {} for rpc {} successfully completed", RemoteDOMRpcFuture.this, rpcName);
             }
-            RemoteDOMRpcFuture.this.setException(new IllegalStateException("Incorrect reply type " + reply
+            RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply
                     + "from Akka"));
         }
     }
index 404a109..2886fd9 100644 (file)
@@ -3,27 +3,77 @@ package org.opendaylight.controller.remote.rpc;
 import static akka.pattern.Patterns.ask;
 
 import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.japi.Pair;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.List;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
+import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
 
 public class RemoteRpcImplementation implements DOMRpcImplementation {
-    private final ActorRef rpcBroker;
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
+
+    private final ActorRef rpcRegistry;
     private final RemoteRpcProviderConfig config;
 
-    public RemoteRpcImplementation(final ActorRef rpcBroker, final RemoteRpcProviderConfig config) {
-        this.rpcBroker = rpcBroker;
+    public RemoteRpcImplementation(final ActorRef rpcRegistry, final RemoteRpcProviderConfig config) {
         this.config = config;
+        this.rpcRegistry = rpcRegistry;
     }
 
     @Override
-    public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final DOMRpcIdentifier rpc, final NormalizedNode<?, ?> input) {
-        final InvokeRpc rpcMsg = new InvokeRpc(rpc.getType().getLastComponent(), rpc.getContextReference(), input);
-        final scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
-        return RemoteDOMRpcFuture.from(future);
+    public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final DOMRpcIdentifier rpc,
+            final NormalizedNode<?, ?> input) {
+        if (input instanceof RemoteRpcInput) {
+            LOG.warn("Rpc {} was removed during execution or there is loop present. Failing received rpc.", rpc);
+            return Futures
+                    .<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException(
+                            "Rpc implementation for {} was removed during processing.", rpc));
+        }
+        final RemoteDOMRpcFuture frontEndFuture = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
+        findRouteAsync(rpc).onComplete(new OnComplete<FindRoutersReply>() {
+
+            @Override
+            public void onComplete(final Throwable error, final FindRoutersReply routes) throws Throwable {
+                if (error != null) {
+                    frontEndFuture.failNow(error);
+                } else {
+                    final List<Pair<ActorRef, Long>> routePairs = routes.getRouterWithUpdateTime();
+                    if (routePairs == null || routePairs.isEmpty()) {
+                        frontEndFuture.failNow(new DOMRpcImplementationNotAvailableException(
+                                "No local or remote implementation available for rpc %s", rpc.getType(), error));
+                    } else {
+                        final ActorRef remoteImplRef = new LatestEntryRoutingLogic(routePairs).select();
+                        final Object executeRpcMessage = ExecuteRpc.from(rpc, input);
+                        LOG.debug("Found remote actor {} for rpc {} - sending {}", remoteImplRef, rpc.getType(), executeRpcMessage);
+                        frontEndFuture.completeWith(ask(remoteImplRef, executeRpcMessage, config.getAskDuration()));
+                    }
+                }
+            }
+        }, ExecutionContext.Implicits$.MODULE$.global());
+        return frontEndFuture;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private Future<FindRoutersReply> findRouteAsync(final DOMRpcIdentifier rpc) {
+        // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly.
+        final RpcRouter.RouteIdentifier<?, ?, ?> routeId =
+                new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
+        final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
+        return (Future) ask(rpcRegistry, findMsg, config.getAskDuration());
     }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java
new file mode 100644 (file)
index 0000000..3c77220
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+
+class RemoteRpcInput implements ContainerNode {
+
+    private final ContainerNode delegate;
+
+    private RemoteRpcInput(final ContainerNode delegate) {
+        this.delegate = delegate;
+    }
+
+    protected static RemoteRpcInput from(final Node node) {
+        if(node == null) {
+            return null;
+        }
+        final NormalizedNode<?, ?> deserialized = NormalizedNodeSerializer.deSerialize(node);
+        Preconditions.checkArgument(deserialized instanceof ContainerNode);
+        return new RemoteRpcInput((ContainerNode) deserialized);
+    }
+
+    ContainerNode delegate() {
+        return delegate;
+    }
+
+    @Override
+    public Map<QName, String> getAttributes() {
+        return delegate().getAttributes();
+    }
+
+    @Override
+    public Object getAttributeValue(final QName name) {
+        return delegate().getAttributeValue(name);
+    }
+
+    @Override
+    public QName getNodeType() {
+        return delegate().getNodeType();
+    }
+
+    @Override
+    public Collection<DataContainerChild<? extends PathArgument, ?>> getValue() {
+        return delegate().getValue();
+    }
+
+    @Override
+    public NodeIdentifier getIdentifier() {
+        return delegate().getIdentifier();
+    }
+
+    @Override
+    public Optional<DataContainerChild<? extends PathArgument, ?>> getChild(final PathArgument child) {
+        return delegate().getChild(child);
+    }
+
+    @Override
+    public int hashCode() {
+        return delegate().hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        return delegate().equals(obj);
+    }
+}
index d88bb88..0c5315c 100644 (file)
@@ -8,36 +8,23 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import static akka.pattern.Patterns.ask;
-
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.dispatch.OnComplete;
 import akka.japi.Creator;
-import akka.japi.Pair;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
-import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -53,162 +40,82 @@ import org.slf4j.LoggerFactory;
 public class RpcBroker extends AbstractUntypedActor {
 
     private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
-    private final ActorRef rpcRegistry;
-    private final RemoteRpcProviderConfig config;
     private final DOMRpcService rpcService;
 
-    private RpcBroker(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+    private RpcBroker(final DOMRpcService rpcService) {
         this.rpcService = rpcService;
-        this.rpcRegistry = rpcRegistry;
-        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
     }
 
-    public static Props props(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
-        Preconditions.checkNotNull(rpcRegistry, "ActorRef can not be null!");
+    public static Props props(final DOMRpcService rpcService) {
         Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
-        return Props.create(new RpcBrokerCreator(rpcService, rpcRegistry));
+        return Props.create(new RpcBrokerCreator(rpcService));
     }
 
     @Override
     protected void handleReceive(final Object message) throws Exception {
-        if(message instanceof InvokeRpc) {
-            invokeRemoteRpc((InvokeRpc) message);
-        } else if(message instanceof ExecuteRpc) {
+        if (message instanceof ExecuteRpc) {
             executeRpc((ExecuteRpc) message);
         }
     }
 
-    private void invokeRemoteRpc(final InvokeRpc msg) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
-        }
-        final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
-                null, msg.getRpc(), msg.getIdentifier());
-        final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
-
-        final scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
-
-        final ActorRef sender = getSender();
-        final ActorRef self = self();
-
-        final OnComplete<Object> onComplete = new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object reply) throws Throwable {
-                if(failure != null) {
-                    LOG.error("FindRouters failed", failure);
-                    sender.tell(new akka.actor.Status.Failure(failure), self);
-                    return;
-                }
-
-                final RpcRegistry.Messages.FindRoutersReply findReply =
-                                                (RpcRegistry.Messages.FindRoutersReply)reply;
-
-                final List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
-
-                if(actorRefList == null || actorRefList.isEmpty()) {
-                    sender.tell(new akka.actor.Status.Failure(new DOMRpcImplementationNotAvailableException(
-                            "No remote implementation available for rpc %s", msg.getRpc())), self);
-                    return;
-                }
-                finishInvokeRpc(actorRefList, msg, sender, self);
-            }
-        };
-
-        future.onComplete(onComplete, getContext().dispatcher());
-    }
-
-    protected void finishInvokeRpc(final List<Pair<ActorRef, Long>> actorRefList,
-            final InvokeRpc msg, final ActorRef sender, final ActorRef self) {
-
-        final RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
-
-        final Node serializedNode = NormalizedNodeSerializer.serialize(msg.getInput());
-        final ExecuteRpc executeMsg = new ExecuteRpc(serializedNode, msg.getRpc());
-
-        final scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
-
-        final OnComplete<Object> onComplete = new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object reply) throws Throwable {
-                if(failure != null) {
-                    LOG.error("ExecuteRpc failed", failure);
-                    sender.tell(new akka.actor.Status.Failure(failure), self);
-                    return;
-                }
-
-                LOG.debug("Execute Rpc response received for rpc : {}, responding to sender : {}", msg.getRpc(), sender);
-
-                sender.tell(reply, self);
-            }
-        };
-
-        future.onComplete(onComplete, getContext().dispatcher());
-    }
-
     private void executeRpc(final ExecuteRpc msg) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Executing rpc {}", msg.getRpc());
-        }
-        final NormalizedNode<?, ?> input = NormalizedNodeSerializer.deSerialize(msg.getInputNormalizedNode());
+        LOG.debug("Executing rpc {}", msg.getRpc());
+        final NormalizedNode<?, ?> input = RemoteRpcInput.from(msg.getInputNormalizedNode());
         final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
-
-        final CheckedFuture<DOMRpcResult, DOMRpcException> future = rpcService.invokeRpc(schemaPath, input);
-
-        final ListenableFuture<DOMRpcResult> listenableFuture =
-                JdkFutureAdapters.listenInPoolThread(future);
-
         final ActorRef sender = getSender();
         final ActorRef self = self();
 
-        Futures.addCallback(listenableFuture, new FutureCallback<DOMRpcResult>() {
-            @Override
-            public void onSuccess(final DOMRpcResult result) {
-                if (result.getErrors() != null && ( ! result.getErrors().isEmpty())) {
-                    final String message = String.format("Execution of RPC %s failed",  msg.getRpc());
-                    Collection<RpcError> errors = result.getErrors();
-                    if(errors == null || errors.size() == 0) {
-                        errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
-                                null, message));
-                    }
+        try {
+            final CheckedFuture<DOMRpcResult, DOMRpcException> future = rpcService.invokeRpc(schemaPath, input);
 
-                    sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
-                            message, errors)), self);
-                } else {
-                    final Node serializedResultNode;
-                    if(result.getResult() == null){
-                        serializedResultNode = null;
+            Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
+                @Override
+                public void onSuccess(final DOMRpcResult result) {
+                    if (result.getErrors() != null && (!result.getErrors().isEmpty())) {
+                        final String message = String.format("Execution of RPC %s failed", msg.getRpc());
+                        Collection<RpcError> errors = result.getErrors();
+                        if (errors == null || errors.size() == 0) {
+                            errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, null, message));
+                        }
+
+                        sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, errors)), self);
                     } else {
-                        serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
-                    }
+                        final Node serializedResultNode;
+                        if (result.getResult() == null) {
+                            serializedResultNode = null;
+                        } else {
+                            serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
+                        }
 
-                    LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
+                        LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
 
-                    sender.tell(new RpcResponse(serializedResultNode), self);
+                        sender.tell(new RpcResponse(serializedResultNode), self);
+                    }
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
+                    sender.tell(new akka.actor.Status.Failure(t), self);
                 }
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
-                sender.tell(new akka.actor.Status.Failure(t), self);
-            }
-        });
+            });
+        } catch (final Exception e) {
+            sender.tell(new akka.actor.Status.Failure(e), sender);
+        }
     }
 
     private static class RpcBrokerCreator implements Creator<RpcBroker> {
         private static final long serialVersionUID = 1L;
 
         final DOMRpcService rpcService;
-        final ActorRef rpcRegistry;
 
-        RpcBrokerCreator(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+        RpcBrokerCreator(final DOMRpcService rpcService) {
             this.rpcService = rpcService;
-            this.rpcRegistry = rpcRegistry;
         }
 
         @Override
         public RpcBroker create() throws Exception {
-            return new RpcBroker(rpcService, rpcRegistry);
+            return new RpcBroker(rpcService);
         }
     }
 }
index 7e4d8a0..8faa331 100644 (file)
@@ -12,7 +12,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
@@ -23,7 +23,7 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
  *
  * @author Thomas Pantelis
  */
-public class RpcErrorsException extends Exception {
+public class RpcErrorsException extends DOMRpcException {
 
     private static final long serialVersionUID = 1L;
 
@@ -38,8 +38,8 @@ public class RpcErrorsException extends Exception {
         final String info;
         final Throwable cause;
 
-        RpcErrorData(ErrorSeverity severity, ErrorType errorType, String tag,
-                String applicationTag, String message, String info, Throwable cause) {
+        RpcErrorData(final ErrorSeverity severity, final ErrorType errorType, final String tag,
+                final String applicationTag, final String message, final String info, final Throwable cause) {
             this.severity = severity;
             this.errorType = errorType;
             this.tag = tag;
@@ -52,10 +52,10 @@ public class RpcErrorsException extends Exception {
 
     private final List<RpcErrorData> rpcErrorDataList = new ArrayList<>();
 
-    public RpcErrorsException(String message, Iterable<RpcError> rpcErrors) {
+    public RpcErrorsException(final String message, final Iterable<RpcError> rpcErrors) {
         super(message);
 
-        for(RpcError rpcError: rpcErrors) {
+        for(final RpcError rpcError: rpcErrors) {
             rpcErrorDataList.add(new RpcErrorData(rpcError.getSeverity(), rpcError.getErrorType(),
                     rpcError.getTag(), rpcError.getApplicationTag(), rpcError.getMessage(),
                     rpcError.getInfo(), rpcError.getCause()));
@@ -63,9 +63,9 @@ public class RpcErrorsException extends Exception {
     }
 
     public Collection<RpcError> getRpcErrors() {
-        Collection<RpcError> rpcErrors = new ArrayList<>();
-        for(RpcErrorData ed: rpcErrorDataList) {
-            RpcError rpcError = ed.severity == ErrorSeverity.ERROR ?
+        final Collection<RpcError> rpcErrors = new ArrayList<>();
+        for(final RpcErrorData ed: rpcErrorDataList) {
+            final RpcError rpcError = ed.severity == ErrorSeverity.ERROR ?
                     RpcResultBuilder.newError(ed.errorType, ed.tag, ed.message, ed.applicationTag,
                             ed.info, ed.cause) :
                     RpcResultBuilder.newWarning(ed.errorType, ed.tag, ed.message, ed.applicationTag,
index 461bd00..1ade84b 100644 (file)
@@ -82,7 +82,7 @@ public class RpcManager extends AbstractUntypedActor {
                     withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
 
         rpcBroker =
-                getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry).
+                getContext().actorOf(RpcBroker.props(rpcServices).
                     withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
 
         final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
@@ -93,7 +93,7 @@ public class RpcManager extends AbstractUntypedActor {
         LOG.debug("Registers rpc listeners");
 
         rpcListener = new RpcListener(rpcRegistry);
-        rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
+        rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
 
         rpcServices.registerRpcListener(rpcListener);
 
@@ -102,10 +102,10 @@ public class RpcManager extends AbstractUntypedActor {
     }
 
     private void registerRoutedRpcDelegate() {
-        Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
-        Set<Module> modules = schemaContext.getModules();
-        for(Module module : modules){
-            for(RpcDefinition rpcDefinition : module.getRpcs()){
+        final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
+        final Set<Module> modules = schemaContext.getModules();
+        for(final Module module : modules){
+            for(final RpcDefinition rpcDefinition : module.getRpcs()){
                 if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
                     LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
                     rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
@@ -125,7 +125,9 @@ public class RpcManager extends AbstractUntypedActor {
         for (final RpcDefinition rpcDef : currentlySupportedRpc) {
             rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
         }
-        rpcListener.onRpcAvailable(rpcs);
+        if(!rpcs.isEmpty()) {
+            rpcListener.onRpcAvailable(rpcs);
+        }
     }
 
 
index 66c0c1b..55f8785 100644 (file)
@@ -8,19 +8,27 @@
 package org.opendaylight.controller.remote.rpc.messages;
 
 
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import java.io.Serializable;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+/**
+ * @author tony
+ *
+ */
 public class ExecuteRpc implements Serializable {
     private static final long serialVersionUID = 1128904894827335676L;
 
     private final NormalizedNodeMessages.Node inputNormalizedNode;
     private final QName rpc;
 
-    public ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) {
-        Preconditions.checkNotNull(inputNormalizedNode, "Normalized Node input string should be present");
+    private ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) {
         Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
 
         this.inputNormalizedNode = inputNormalizedNode;
@@ -34,4 +42,22 @@ public class ExecuteRpc implements Serializable {
     public QName getRpc() {
         return rpc;
     }
+
+    public static ExecuteRpc from(final DOMRpcIdentifier rpc, final NormalizedNode<?, ?> input) {
+        final Node serializedInput;
+        if(input != null) {
+            serializedInput = NormalizedNodeSerializer.serialize(input);
+        } else {
+            serializedInput = null;
+        }
+        return new ExecuteRpc(serializedInput, rpc.getType().getLastComponent());
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("rpc", rpc)
+                .add("normalizedNode", inputNormalizedNode)
+                .toString();
+    }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java
deleted file mode 100644 (file)
index a7fbe83..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc.messages;
-
-import com.google.common.base.Preconditions;
-import java.io.Serializable;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class InvokeRpc implements Serializable {
-    private static final long serialVersionUID = -2813459607858108953L;
-
-    private final QName rpc;
-    private final YangInstanceIdentifier identifier;
-    private final NormalizedNode<?,?> input;
-
-    public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final NormalizedNode<?,?> input) {
-        Preconditions.checkNotNull(rpc, "rpc qname should not be null");
-        Preconditions.checkNotNull(input, "rpc input should not be null");
-
-        this.rpc = rpc;
-        this.identifier = identifier;
-        this.input = input;
-    }
-
-    public QName getRpc() {
-        return rpc;
-    }
-
-    public YangInstanceIdentifier getIdentifier() {
-        return identifier;
-    }
-
-    public NormalizedNode<?,?> getInput() {
-        return input;
-    }
-}
index afe81a8..f88b0c6 100644 (file)
@@ -9,25 +9,37 @@
 package org.opendaylight.controller.remote.rpc;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import java.io.File;
 import java.net.URI;
 import java.util.Arrays;
+import java.util.Collection;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.mockito.Mockito;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
 /**
@@ -45,21 +57,31 @@ public class AbstractRpcTest {
     static final QName TEST_RPC_OUTPUT = QName.create(TEST_NS, TEST_REV, "output");
     static final QName TEST_RPC_OUTPUT_DATA = new QName(TEST_URI, "output-data");
 
+
+    static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC);
+    static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(TEST_RPC));
+    static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH);
+
     static ActorSystem node1;
     static ActorSystem node2;
+    static RemoteRpcProviderConfig config1;
+    static RemoteRpcProviderConfig config2;
 
     protected ActorRef rpcBroker1;
-    protected JavaTestKit probeReg1;
+    protected JavaTestKit rpcRegistry1Probe;
     protected ActorRef rpcBroker2;
-    protected JavaTestKit probeReg2;
+    protected JavaTestKit rpcRegistry2Probe;
     protected Broker.ProviderSession brokerSession;
     protected SchemaContext schemaContext;
-    protected DOMRpcService rpcService;
+    protected RemoteRpcImplementation remoteRpcImpl1;
+    protected RemoteRpcImplementation remoteRpcImpl2;
+    protected DOMRpcService domRpcService1;
+    protected DOMRpcService domRpcService2;
 
     @BeforeClass
     public static void setup() throws InterruptedException {
-        final RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
-        final RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+        config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+        config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
         node1 = ActorSystem.create("opendaylight-rpc", config1.get());
         node2 = ActorSystem.create("opendaylight-rpc", config2.get());
     }
@@ -77,13 +99,15 @@ public class AbstractRpcTest {
         schemaContext = new YangParserImpl().parseFiles(Arrays.asList(
                 new File(RpcBrokerTest.class.getResource("/test-rpc.yang").getPath())));
 
-        brokerSession = Mockito.mock(Broker.ProviderSession.class);
-        rpcService = Mockito.mock(DOMRpcService.class);
+        domRpcService1 = Mockito.mock(DOMRpcService.class);
+        domRpcService2 = Mockito.mock(DOMRpcService.class);
+        rpcRegistry1Probe = new JavaTestKit(node1);
+        rpcBroker1 = node1.actorOf(RpcBroker.props(domRpcService1));
+        rpcRegistry2Probe = new JavaTestKit(node2);
+        rpcBroker2 = node2.actorOf(RpcBroker.props(domRpcService2));
+        remoteRpcImpl1 = new RemoteRpcImplementation(rpcRegistry1Probe.getRef(), config1);
+        remoteRpcImpl2 = new RemoteRpcImplementation(rpcRegistry2Probe.getRef(), config2);
 
-        probeReg1 = new JavaTestKit(node1);
-        rpcBroker1 = node1.actorOf(RpcBroker.props(rpcService, probeReg1.getRef()));
-        probeReg2 = new JavaTestKit(node2);
-        rpcBroker2 = node2.actorOf(RpcBroker.props(rpcService, probeReg2.getRef()));
 
     }
 
@@ -104,6 +128,38 @@ public class AbstractRpcTest {
         }
     }
 
+    static void assertCompositeNodeEquals(final NormalizedNode<? , ?> exp, final NormalizedNode<? , ? > actual) {
+        assertEquals(exp, actual);
+    }
+
+    static ContainerNode makeRPCInput(final String data) {
+        return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_INPUT))
+            .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build();
+
+    }
+
+    static ContainerNode makeRPCOutput(final String data) {
+        return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_OUTPUT))
+                .withChild(ImmutableNodes.leafNode(TEST_RPC_OUTPUT, data)).build();
+    }
+
+    static void assertFailedRpcResult(final DOMRpcResult rpcResult, final ErrorSeverity severity,
+            final ErrorType errorType, final String tag, final String message, final String applicationTag, final String info,
+            final String causeMsg) {
+
+        assertNotNull("RpcResult was null", rpcResult);
+        final Collection<RpcError> rpcErrors = rpcResult.getErrors();
+        assertEquals("RpcErrors count", 1, rpcErrors.size());
+        assertRpcErrorEquals(rpcErrors.iterator().next(), severity, errorType, tag, message,
+                applicationTag, info, causeMsg);
+    }
+
+    static void assertSuccessfulRpcResult(final DOMRpcResult rpcResult,
+            final NormalizedNode<? , ?> expOutput) {
+        assertNotNull("RpcResult was null", rpcResult);
+        assertCompositeNodeEquals(expOutput, rpcResult.getResult());
+    }
+
     static class TestException extends Exception {
         private static final long serialVersionUID = 1L;
 
index 2026d48..b3ed7ff 100644 (file)
@@ -8,6 +8,34 @@
 
 package org.opendaylight.controller.remote.rpc;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.japi.Pair;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 /***
  * Unit tests for RemoteRpcImplementation.
@@ -17,7 +45,201 @@ package org.opendaylight.controller.remote.rpc;
 public class RemoteRpcImplementationTest extends AbstractRpcTest {
 
 
-    private RemoteRpcProviderConfig getConfig(){
+
+    @Test(expected = DOMRpcImplementationNotAvailableException.class)
+    public void testInvokeRpcWithNoRemoteActor() throws Exception {
+        final ContainerNode input = makeRPCInput("foo");
+        final CheckedFuture<DOMRpcResult, DOMRpcException> failedFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, input);
+        rpcRegistry1Probe.expectMsgClass(JavaTestKit.duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
+        rpcRegistry1Probe
+                .reply(new RpcRegistry.Messages.FindRoutersReply(Collections.<Pair<ActorRef, Long>>emptyList()));
+        failedFuture.checkedGet(5, TimeUnit.SECONDS);
+    }
+
+
+    /**
+     * This test method invokes and executes the remote rpc
+     */
+    @Test
+    public void testInvokeRpc() throws Exception {
+        final ContainerNode rpcOutput = makeRPCOutput("bar");
+        final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
+
+        final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+                (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+
+        when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
+                Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<ActorRef, Long>(
+                rpcBroker2, 200L))));
+
+        final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+        assertEquals(rpcOutput, result.getResult());
+    }
+
+    /**
+     * This test method invokes and executes the remote rpc
+     */
+    @Test
+    public void testInvokeRpcWithNullInput() throws Exception {
+        final ContainerNode rpcOutput = makeRPCOutput("bar");
+        final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+                (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+
+        when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
+                Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, null);
+        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<ActorRef, Long>(
+                rpcBroker2, 200L))));
+
+        final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+        assertEquals(rpcOutput, result.getResult());
+    }
+
+
+    /**
+     * This test method invokes and executes the remote rpc
+     */
+    @Test
+    public void testInvokeRpcWithNoOutput() throws Exception {
+        final ContainerNode rpcOutput = null;
+        final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
+
+        final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+                (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+
+        when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
+                Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<ActorRef, Long>(
+                rpcBroker2, 200L))));
+
+        final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+        assertNull(result.getResult());
+    }
+
+
+    /**
+     * This test method invokes and executes the remote rpc
+     */
+    @Test(expected = DOMRpcException.class)
+    public void testInvokeRpcWithRemoteFailedFuture() throws Exception {
+        final ContainerNode rpcOutput = null;
+        final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
+
+        final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+                (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+
+        when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
+                Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcException(
+                        "Test Exception") {}));
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<ActorRef, Long>(
+                rpcBroker2, 200L))));
+        frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+    }
+
+    /**
+     * This test method invokes and tests exceptions when akka timeout occured
+     *
+     * Currently ignored since this test with current config takes around 15 seconds
+     * to complete.
+     *
+     */
+    @Ignore
+    @Test(expected = RemoteDOMRpcException.class)
+    public void testInvokeRpcWithAkkaTimeoutException() throws Exception {
+        final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+                (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+        frontEndFuture.checkedGet(20, TimeUnit.SECONDS);
+    }
+
+    /**
+     * This test method invokes remote rpc and lookup failed
+     * with runtime exception.
+     */
+    @Test(expected = DOMRpcException.class)
+    public void testInvokeRpcWithLookupException() throws Exception {
+        final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+                (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+        rpcRegistry1Probe.reply( new Status.Failure(new RuntimeException("test")));
+        frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+    }
+
+    /**
+     * This test method invokes and executes the remote rpc
+     */
+    @Test(expected = DOMRpcImplementationNotAvailableException.class)
+    public void testInvokeRpcWithLoopException() throws Exception {
+        final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(NormalizedNodeSerializer.serialize(makeRPCInput("foo")));
+        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+
+        frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+    }
+
+
+    private RemoteRpcProviderConfig getConfig() {
         return new RemoteRpcProviderConfig.Builder("unit-test").build();
     }
 }
index 5cd3df3..78a368f 100644 (file)
 package org.opendaylight.controller.remote.rpc;
 
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.Config;
+import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 
 public class RemoteRpcProviderTest {
 
@@ -35,4 +49,22 @@ public class RemoteRpcProviderTest {
     system = null;
   }
 
+  @Test
+  public void testRemoteRpcProvider() throws Exception {
+    final RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class));
+    final Broker.ProviderSession session = mock(Broker.ProviderSession.class);
+    final SchemaService schemaService = mock(SchemaService.class);
+    when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class));
+    when(session.getService(SchemaService.class)).thenReturn(schemaService);
+    when(session.getService(DOMRpcService.class)).thenReturn(mock(DOMRpcService.class));
+
+    rpcProvider.onSessionInitiated(session);
+
+    final ActorRef actorRef = Await.result(
+            system.actorSelection(
+                    moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
+                                                                 Duration.create(2, TimeUnit.SECONDS));
+
+    Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
+  }
 }
index 16b1391..6ecd7da 100644 (file)
@@ -9,8 +9,71 @@
 package org.opendaylight.controller.remote.rpc;
 
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
+import akka.actor.Status.Failure;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class RpcBrokerTest extends AbstractRpcTest {
 
+    @Test
+    public void testExecuteRpc() {
+        new JavaTestKit(node1) {
+            {
+
+                final ContainerNode invokeRpcResult = makeRPCOutput("bar");
+                final DOMRpcResult rpcResult = new DefaultDOMRpcResult(invokeRpcResult);
+                when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.<NormalizedNode<?, ?>>any())).thenReturn(
+                        Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+
+                final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
+
+                rpcBroker1.tell(executeMsg, getRef());
+
+                final RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+                assertEquals(rpcResult.getResult(),
+                        NormalizedNodeSerializer.deSerialize(rpcResponse.getResultNormalizedNode()));
+            }
+        };
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithException() {
+
+        new JavaTestKit(node1) {
+            {
+
+                when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.<NormalizedNode<?, ?>>any()))
+                        .thenReturn(
+                                Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException(
+                                        "NOT FOUND")));
+
+                final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
+
+                rpcBroker1.tell(executeMsg, getRef());
+
+                final Failure rpcResponse = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+                Assert.assertTrue(rpcResponse.cause() instanceof DOMRpcException);
+            }
+        };
+
+    }
 
 }
index ecfaef8..37d81c7 100644 (file)
@@ -8,26 +8,70 @@
 
 package org.opendaylight.controller.remote.rpc;
 
+
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.ConfigFactory;
+import java.net.URISyntaxException;
+import java.util.Collections;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 public class RpcListenerTest {
 
-  static ActorSystem system;
+    private static final QName TEST_QNAME = QName.create("test", "2015-06-12", "test");
+    private static final SchemaPath RPC_TYPE = SchemaPath.create(true, TEST_QNAME);
+    private static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier
+            .create(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME));
+    private static final DOMRpcIdentifier RPC_ID = DOMRpcIdentifier.create(RPC_TYPE, TEST_PATH);
+    static ActorSystem system;
+
+
+    @BeforeClass
+    public static void setup() throws InterruptedException {
+        system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
+    }
+
+    @AfterClass
+    public static void teardown() {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
 
+    @Test
+    public void testRouteAdd() throws URISyntaxException, InterruptedException {
+        new JavaTestKit(system) {
+            {
+                // Test announcements
+                final JavaTestKit probeReg = new JavaTestKit(system);
+                final ActorRef rpcRegistry = probeReg.getRef();
 
-  @BeforeClass
-  public static void setup() throws InterruptedException {
-    system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
-  }
+                final RpcListener rpcListener = new RpcListener(rpcRegistry);
+                rpcListener.onRpcAvailable(Collections.singleton(RPC_ID));
+                probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class);
+            }
+        };
+    }
 
-  @AfterClass
-  public static void teardown() {
-    JavaTestKit.shutdownActorSystem(system);
-    system = null;
-  }
+    @Test
+    public void testRouteRemove() throws URISyntaxException, InterruptedException {
+        new JavaTestKit(system) {
+            {
+                // Test announcements
+                final JavaTestKit probeReg = new JavaTestKit(system);
+                final ActorRef rpcRegistry = probeReg.getRef();
 
+                final RpcListener rpcListener = new RpcListener(rpcRegistry);
+                rpcListener.onRpcUnavailable(Collections.singleton(RPC_ID));
+                probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class);
+            }
+        };
+    }
 }