From 168591eb3dfc298b631e8dc51e25b0331c105cfc Mon Sep 17 00:00:00 2001 From: Tony Tkacik Date: Thu, 11 Jun 2015 17:19:46 +0200 Subject: [PATCH] Removed uncessary calls to RpcBroker to find routes. Moved invokeRpc part to RemoteRpcImplementation which allowed to do ask for lookups in RemoteRpcImplementation. This changed role of RpcBroker part to only delegate to MD-SAL, if RPC was received via Akka. remote.rpc.RpcBroker interaction model represented multi-stepped pipeline which resulted in following message pattern RemoteRpcImplementaion -> RpcBroker#InvokeRpc -> RpcRegistry#FindRoutes -> RpcBroker#ExecuteRpc InvokeRpc only did lookup in FindRoutes and all outgoing messages needed to pass via RpcBroker. Unfortunatelly this also prevented lookup of any RPC Path during executing RPC in MD-SAL. Change-Id: I6e84bfcb74b71f7417c2d3f8c35a7f8b0406caf9 Signed-off-by: Tony Tkacik (cherry picked from commit 9216287a4d1fc310f81f1956685f4e6deb7eefa8) --- .../remote/rpc/RemoteDOMRpcFuture.java | 34 ++- .../remote/rpc/RemoteRpcImplementation.java | 66 +++++- .../controller/remote/rpc/RemoteRpcInput.java | 83 +++++++ .../controller/remote/rpc/RpcBroker.java | 173 ++++---------- .../remote/rpc/RpcErrorsException.java | 18 +- .../controller/remote/rpc/RpcManager.java | 16 +- .../remote/rpc/messages/ExecuteRpc.java | 30 ++- .../remote/rpc/messages/InvokeRpc.java | 43 ---- .../remote/rpc/AbstractRpcTest.java | 78 +++++- .../rpc/RemoteRpcImplementationTest.java | 224 +++++++++++++++++- .../remote/rpc/RemoteRpcProviderTest.java | 32 +++ .../controller/remote/rpc/RpcBrokerTest.java | 63 +++++ .../remote/rpc/RpcListenerTest.java | 64 ++++- 13 files changed, 692 insertions(+), 232 deletions(-) create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java delete mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java index e97a499fdb..c6b796d26d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.remote.rpc; import akka.dispatch.OnComplete; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.CheckedFuture; @@ -19,22 +20,38 @@ import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; +/** + * @author tony + * + */ class RemoteDOMRpcFuture extends AbstractFuture implements CheckedFuture { private static final Logger LOG = LoggerFactory.getLogger(RemoteDOMRpcFuture.class); - private RemoteDOMRpcFuture(final Future future) { - future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global()); + private final QName rpcName; + + private RemoteDOMRpcFuture(final QName rpcName) { + this.rpcName = Preconditions.checkNotNull(rpcName,"rpcName"); } - public static CheckedFuture from(final Future future) { - return new RemoteDOMRpcFuture(future); + public static RemoteDOMRpcFuture create(final QName rpcName) { + return new RemoteDOMRpcFuture(rpcName); + } + + protected void failNow(final Throwable error) { + LOG.debug("Failing future {} for rpc {}", this, rpcName, error); + setException(error); + } + + protected void completeWith(final Future future) { + future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global()); } @Override @@ -72,20 +89,21 @@ class RemoteDOMRpcFuture extends AbstractFuture implements Checked @Override public void onComplete(final Throwable error, final Object reply) throws Throwable { if (error != null) { - RemoteDOMRpcFuture.this.setException(error); + RemoteDOMRpcFuture.this.failNow(error); } else if (reply instanceof RpcResponse) { final RpcResponse rpcReply = (RpcResponse) reply; final NormalizedNode result; if (rpcReply.getResultNormalizedNode() == null) { result = null; - LOG.debug("Received response for invoke rpc: result is null"); + LOG.debug("Received response for rpc {}: result is null", rpcName); } else { result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode()); - LOG.debug("Received response for invoke rpc: result is {}", result); + LOG.debug("Received response for rpc {}: result is {}", rpcName, result); } RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result)); + LOG.debug("Future {} for rpc {} successfully completed", RemoteDOMRpcFuture.this, rpcName); } - RemoteDOMRpcFuture.this.setException(new IllegalStateException("Incorrect reply type " + reply + RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply + "from Akka")); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index 404a109741..2886fd9879 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -3,27 +3,77 @@ package org.opendaylight.controller.remote.rpc; import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.japi.Pair; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import java.util.List; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; +import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply; +import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; +import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; public class RemoteRpcImplementation implements DOMRpcImplementation { - private final ActorRef rpcBroker; + private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class); + + private final ActorRef rpcRegistry; private final RemoteRpcProviderConfig config; - public RemoteRpcImplementation(final ActorRef rpcBroker, final RemoteRpcProviderConfig config) { - this.rpcBroker = rpcBroker; + public RemoteRpcImplementation(final ActorRef rpcRegistry, final RemoteRpcProviderConfig config) { this.config = config; + this.rpcRegistry = rpcRegistry; } @Override - public CheckedFuture invokeRpc(final DOMRpcIdentifier rpc, final NormalizedNode input) { - final InvokeRpc rpcMsg = new InvokeRpc(rpc.getType().getLastComponent(), rpc.getContextReference(), input); - final scala.concurrent.Future future = ask(rpcBroker, rpcMsg, config.getAskDuration()); - return RemoteDOMRpcFuture.from(future); + public CheckedFuture invokeRpc(final DOMRpcIdentifier rpc, + final NormalizedNode input) { + if (input instanceof RemoteRpcInput) { + LOG.warn("Rpc {} was removed during execution or there is loop present. Failing received rpc.", rpc); + return Futures + .immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException( + "Rpc implementation for {} was removed during processing.", rpc)); + } + final RemoteDOMRpcFuture frontEndFuture = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent()); + findRouteAsync(rpc).onComplete(new OnComplete() { + + @Override + public void onComplete(final Throwable error, final FindRoutersReply routes) throws Throwable { + if (error != null) { + frontEndFuture.failNow(error); + } else { + final List> routePairs = routes.getRouterWithUpdateTime(); + if (routePairs == null || routePairs.isEmpty()) { + frontEndFuture.failNow(new DOMRpcImplementationNotAvailableException( + "No local or remote implementation available for rpc %s", rpc.getType(), error)); + } else { + final ActorRef remoteImplRef = new LatestEntryRoutingLogic(routePairs).select(); + final Object executeRpcMessage = ExecuteRpc.from(rpc, input); + LOG.debug("Found remote actor {} for rpc {} - sending {}", remoteImplRef, rpc.getType(), executeRpcMessage); + frontEndFuture.completeWith(ask(remoteImplRef, executeRpcMessage, config.getAskDuration())); + } + } + } + }, ExecutionContext.Implicits$.MODULE$.global()); + return frontEndFuture; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private Future findRouteAsync(final DOMRpcIdentifier rpc) { + // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly. + final RpcRouter.RouteIdentifier routeId = + new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference()); + final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId); + return (Future) ask(rpcRegistry, findMsg, config.getAskDuration()); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java new file mode 100644 index 0000000000..3c77220972 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + + +class RemoteRpcInput implements ContainerNode { + + private final ContainerNode delegate; + + private RemoteRpcInput(final ContainerNode delegate) { + this.delegate = delegate; + } + + protected static RemoteRpcInput from(final Node node) { + if(node == null) { + return null; + } + final NormalizedNode deserialized = NormalizedNodeSerializer.deSerialize(node); + Preconditions.checkArgument(deserialized instanceof ContainerNode); + return new RemoteRpcInput((ContainerNode) deserialized); + } + + ContainerNode delegate() { + return delegate; + } + + @Override + public Map getAttributes() { + return delegate().getAttributes(); + } + + @Override + public Object getAttributeValue(final QName name) { + return delegate().getAttributeValue(name); + } + + @Override + public QName getNodeType() { + return delegate().getNodeType(); + } + + @Override + public Collection> getValue() { + return delegate().getValue(); + } + + @Override + public NodeIdentifier getIdentifier() { + return delegate().getIdentifier(); + } + + @Override + public Optional> getChild(final PathArgument child) { + return delegate().getChild(child); + } + + @Override + public int hashCode() { + return delegate().hashCode(); + } + + @Override + public boolean equals(final Object obj) { + return delegate().equals(obj); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index d88bb88017..0c5315c3dc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -8,36 +8,23 @@ package org.opendaylight.controller.remote.rpc; -import static akka.pattern.Patterns.ask; - import akka.actor.ActorRef; import akka.actor.Props; -import akka.dispatch.OnComplete; import akka.japi.Creator; -import akka.japi.Pair; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; -import com.google.common.util.concurrent.ListenableFuture; import java.util.Arrays; import java.util.Collection; -import java.util.List; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; -import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; -import org.opendaylight.controller.remote.rpc.utils.RoutingLogic; -import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -53,162 +40,82 @@ import org.slf4j.LoggerFactory; public class RpcBroker extends AbstractUntypedActor { private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class); - private final ActorRef rpcRegistry; - private final RemoteRpcProviderConfig config; private final DOMRpcService rpcService; - private RpcBroker(final DOMRpcService rpcService, final ActorRef rpcRegistry) { + private RpcBroker(final DOMRpcService rpcService) { this.rpcService = rpcService; - this.rpcRegistry = rpcRegistry; - config = new RemoteRpcProviderConfig(getContext().system().settings().config()); } - public static Props props(final DOMRpcService rpcService, final ActorRef rpcRegistry) { - Preconditions.checkNotNull(rpcRegistry, "ActorRef can not be null!"); + public static Props props(final DOMRpcService rpcService) { Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null"); - return Props.create(new RpcBrokerCreator(rpcService, rpcRegistry)); + return Props.create(new RpcBrokerCreator(rpcService)); } @Override protected void handleReceive(final Object message) throws Exception { - if(message instanceof InvokeRpc) { - invokeRemoteRpc((InvokeRpc) message); - } else if(message instanceof ExecuteRpc) { + if (message instanceof ExecuteRpc) { executeRpc((ExecuteRpc) message); } } - private void invokeRemoteRpc(final InvokeRpc msg) { - if(LOG.isDebugEnabled()) { - LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); - } - final RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl( - null, msg.getRpc(), msg.getIdentifier()); - final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId); - - final scala.concurrent.Future future = ask(rpcRegistry, findMsg, config.getAskDuration()); - - final ActorRef sender = getSender(); - final ActorRef self = self(); - - final OnComplete onComplete = new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object reply) throws Throwable { - if(failure != null) { - LOG.error("FindRouters failed", failure); - sender.tell(new akka.actor.Status.Failure(failure), self); - return; - } - - final RpcRegistry.Messages.FindRoutersReply findReply = - (RpcRegistry.Messages.FindRoutersReply)reply; - - final List> actorRefList = findReply.getRouterWithUpdateTime(); - - if(actorRefList == null || actorRefList.isEmpty()) { - sender.tell(new akka.actor.Status.Failure(new DOMRpcImplementationNotAvailableException( - "No remote implementation available for rpc %s", msg.getRpc())), self); - return; - } - finishInvokeRpc(actorRefList, msg, sender, self); - } - }; - - future.onComplete(onComplete, getContext().dispatcher()); - } - - protected void finishInvokeRpc(final List> actorRefList, - final InvokeRpc msg, final ActorRef sender, final ActorRef self) { - - final RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList); - - final Node serializedNode = NormalizedNodeSerializer.serialize(msg.getInput()); - final ExecuteRpc executeMsg = new ExecuteRpc(serializedNode, msg.getRpc()); - - final scala.concurrent.Future future = ask(logic.select(), executeMsg, config.getAskDuration()); - - final OnComplete onComplete = new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object reply) throws Throwable { - if(failure != null) { - LOG.error("ExecuteRpc failed", failure); - sender.tell(new akka.actor.Status.Failure(failure), self); - return; - } - - LOG.debug("Execute Rpc response received for rpc : {}, responding to sender : {}", msg.getRpc(), sender); - - sender.tell(reply, self); - } - }; - - future.onComplete(onComplete, getContext().dispatcher()); - } - private void executeRpc(final ExecuteRpc msg) { - if(LOG.isDebugEnabled()) { - LOG.debug("Executing rpc {}", msg.getRpc()); - } - final NormalizedNode input = NormalizedNodeSerializer.deSerialize(msg.getInputNormalizedNode()); + LOG.debug("Executing rpc {}", msg.getRpc()); + final NormalizedNode input = RemoteRpcInput.from(msg.getInputNormalizedNode()); final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc()); - - final CheckedFuture future = rpcService.invokeRpc(schemaPath, input); - - final ListenableFuture listenableFuture = - JdkFutureAdapters.listenInPoolThread(future); - final ActorRef sender = getSender(); final ActorRef self = self(); - Futures.addCallback(listenableFuture, new FutureCallback() { - @Override - public void onSuccess(final DOMRpcResult result) { - if (result.getErrors() != null && ( ! result.getErrors().isEmpty())) { - final String message = String.format("Execution of RPC %s failed", msg.getRpc()); - Collection errors = result.getErrors(); - if(errors == null || errors.size() == 0) { - errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, - null, message)); - } + try { + final CheckedFuture future = rpcService.invokeRpc(schemaPath, input); - sender.tell(new akka.actor.Status.Failure(new RpcErrorsException( - message, errors)), self); - } else { - final Node serializedResultNode; - if(result.getResult() == null){ - serializedResultNode = null; + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final DOMRpcResult result) { + if (result.getErrors() != null && (!result.getErrors().isEmpty())) { + final String message = String.format("Execution of RPC %s failed", msg.getRpc()); + Collection errors = result.getErrors(); + if (errors == null || errors.size() == 0) { + errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, null, message)); + } + + sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, errors)), self); } else { - serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult()); - } + final Node serializedResultNode; + if (result.getResult() == null) { + serializedResultNode = null; + } else { + serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult()); + } - LOG.debug("Sending response for execute rpc : {}", msg.getRpc()); + LOG.debug("Sending response for execute rpc : {}", msg.getRpc()); - sender.tell(new RpcResponse(serializedResultNode), self); + sender.tell(new RpcResponse(serializedResultNode), self); + } + } + + @Override + public void onFailure(final Throwable t) { + LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t); + sender.tell(new akka.actor.Status.Failure(t), self); } - } - - @Override - public void onFailure(final Throwable t) { - LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t); - sender.tell(new akka.actor.Status.Failure(t), self); - } - }); + }); + } catch (final Exception e) { + sender.tell(new akka.actor.Status.Failure(e), sender); + } } private static class RpcBrokerCreator implements Creator { private static final long serialVersionUID = 1L; final DOMRpcService rpcService; - final ActorRef rpcRegistry; - RpcBrokerCreator(final DOMRpcService rpcService, final ActorRef rpcRegistry) { + RpcBrokerCreator(final DOMRpcService rpcService) { this.rpcService = rpcService; - this.rpcRegistry = rpcRegistry; } @Override public RpcBroker create() throws Exception { - return new RpcBroker(rpcService, rpcRegistry); + return new RpcBroker(rpcService); } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java index 7e4d8a034e..8faa331e6f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java @@ -12,7 +12,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.List; - +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; @@ -23,7 +23,7 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder; * * @author Thomas Pantelis */ -public class RpcErrorsException extends Exception { +public class RpcErrorsException extends DOMRpcException { private static final long serialVersionUID = 1L; @@ -38,8 +38,8 @@ public class RpcErrorsException extends Exception { final String info; final Throwable cause; - RpcErrorData(ErrorSeverity severity, ErrorType errorType, String tag, - String applicationTag, String message, String info, Throwable cause) { + RpcErrorData(final ErrorSeverity severity, final ErrorType errorType, final String tag, + final String applicationTag, final String message, final String info, final Throwable cause) { this.severity = severity; this.errorType = errorType; this.tag = tag; @@ -52,10 +52,10 @@ public class RpcErrorsException extends Exception { private final List rpcErrorDataList = new ArrayList<>(); - public RpcErrorsException(String message, Iterable rpcErrors) { + public RpcErrorsException(final String message, final Iterable rpcErrors) { super(message); - for(RpcError rpcError: rpcErrors) { + for(final RpcError rpcError: rpcErrors) { rpcErrorDataList.add(new RpcErrorData(rpcError.getSeverity(), rpcError.getErrorType(), rpcError.getTag(), rpcError.getApplicationTag(), rpcError.getMessage(), rpcError.getInfo(), rpcError.getCause())); @@ -63,9 +63,9 @@ public class RpcErrorsException extends Exception { } public Collection getRpcErrors() { - Collection rpcErrors = new ArrayList<>(); - for(RpcErrorData ed: rpcErrorDataList) { - RpcError rpcError = ed.severity == ErrorSeverity.ERROR ? + final Collection rpcErrors = new ArrayList<>(); + for(final RpcErrorData ed: rpcErrorDataList) { + final RpcError rpcError = ed.severity == ErrorSeverity.ERROR ? RpcResultBuilder.newError(ed.errorType, ed.tag, ed.message, ed.applicationTag, ed.info, ed.cause) : RpcResultBuilder.newWarning(ed.errorType, ed.tag, ed.message, ed.applicationTag, diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index 461bd00f98..1ade84bd0f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -82,7 +82,7 @@ public class RpcManager extends AbstractUntypedActor { withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); rpcBroker = - getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry). + getContext().actorOf(RpcBroker.props(rpcServices). withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker); @@ -93,7 +93,7 @@ public class RpcManager extends AbstractUntypedActor { LOG.debug("Registers rpc listeners"); rpcListener = new RpcListener(rpcRegistry); - rpcImplementation = new RemoteRpcImplementation(rpcBroker, config); + rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config); rpcServices.registerRpcListener(rpcListener); @@ -102,10 +102,10 @@ public class RpcManager extends AbstractUntypedActor { } private void registerRoutedRpcDelegate() { - Set rpcIdentifiers = new HashSet<>(); - Set modules = schemaContext.getModules(); - for(Module module : modules){ - for(RpcDefinition rpcDefinition : module.getRpcs()){ + final Set rpcIdentifiers = new HashSet<>(); + final Set modules = schemaContext.getModules(); + for(final Module module : modules){ + for(final RpcDefinition rpcDefinition : module.getRpcs()){ if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) { LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath()); rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY)); @@ -125,7 +125,9 @@ public class RpcManager extends AbstractUntypedActor { for (final RpcDefinition rpcDef : currentlySupportedRpc) { rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath())); } - rpcListener.onRpcAvailable(rpcs); + if(!rpcs.isEmpty()) { + rpcListener.onRpcAvailable(rpcs); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java index 66c0c1b6f0..55f87851cc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java @@ -8,19 +8,27 @@ package org.opendaylight.controller.remote.rpc.messages; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import java.io.Serializable; +import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +/** + * @author tony + * + */ public class ExecuteRpc implements Serializable { private static final long serialVersionUID = 1128904894827335676L; private final NormalizedNodeMessages.Node inputNormalizedNode; private final QName rpc; - public ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) { - Preconditions.checkNotNull(inputNormalizedNode, "Normalized Node input string should be present"); + private ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) { Preconditions.checkNotNull(rpc, "rpc Qname should not be null"); this.inputNormalizedNode = inputNormalizedNode; @@ -34,4 +42,22 @@ public class ExecuteRpc implements Serializable { public QName getRpc() { return rpc; } + + public static ExecuteRpc from(final DOMRpcIdentifier rpc, final NormalizedNode input) { + final Node serializedInput; + if(input != null) { + serializedInput = NormalizedNodeSerializer.serialize(input); + } else { + serializedInput = null; + } + return new ExecuteRpc(serializedInput, rpc.getType().getLastComponent()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("rpc", rpc) + .add("normalizedNode", inputNormalizedNode) + .toString(); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java deleted file mode 100644 index a7fbe8305e..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc.messages; - -import com.google.common.base.Preconditions; -import java.io.Serializable; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; - -public class InvokeRpc implements Serializable { - private static final long serialVersionUID = -2813459607858108953L; - - private final QName rpc; - private final YangInstanceIdentifier identifier; - private final NormalizedNode input; - - public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final NormalizedNode input) { - Preconditions.checkNotNull(rpc, "rpc qname should not be null"); - Preconditions.checkNotNull(input, "rpc input should not be null"); - - this.rpc = rpc; - this.identifier = identifier; - this.input = input; - } - - public QName getRpc() { - return rpc; - } - - public YangInstanceIdentifier getIdentifier() { - return identifier; - } - - public NormalizedNode getInput() { - return input; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java index afe81a8800..f88b0c6adc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java @@ -9,25 +9,37 @@ package org.opendaylight.controller.remote.rpc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; + import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import java.io.File; import java.net.URI; import java.util.Arrays; +import java.util.Collection; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.mockito.Mockito; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; /** @@ -45,21 +57,31 @@ public class AbstractRpcTest { static final QName TEST_RPC_OUTPUT = QName.create(TEST_NS, TEST_REV, "output"); static final QName TEST_RPC_OUTPUT_DATA = new QName(TEST_URI, "output-data"); + + static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC); + static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(TEST_RPC)); + static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH); + static ActorSystem node1; static ActorSystem node2; + static RemoteRpcProviderConfig config1; + static RemoteRpcProviderConfig config2; protected ActorRef rpcBroker1; - protected JavaTestKit probeReg1; + protected JavaTestKit rpcRegistry1Probe; protected ActorRef rpcBroker2; - protected JavaTestKit probeReg2; + protected JavaTestKit rpcRegistry2Probe; protected Broker.ProviderSession brokerSession; protected SchemaContext schemaContext; - protected DOMRpcService rpcService; + protected RemoteRpcImplementation remoteRpcImpl1; + protected RemoteRpcImplementation remoteRpcImpl2; + protected DOMRpcService domRpcService1; + protected DOMRpcService domRpcService2; @BeforeClass public static void setup() throws InterruptedException { - final RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); - final RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build(); + config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); + config2 = new RemoteRpcProviderConfig.Builder("memberB").build(); node1 = ActorSystem.create("opendaylight-rpc", config1.get()); node2 = ActorSystem.create("opendaylight-rpc", config2.get()); } @@ -77,13 +99,15 @@ public class AbstractRpcTest { schemaContext = new YangParserImpl().parseFiles(Arrays.asList( new File(RpcBrokerTest.class.getResource("/test-rpc.yang").getPath()))); - brokerSession = Mockito.mock(Broker.ProviderSession.class); - rpcService = Mockito.mock(DOMRpcService.class); + domRpcService1 = Mockito.mock(DOMRpcService.class); + domRpcService2 = Mockito.mock(DOMRpcService.class); + rpcRegistry1Probe = new JavaTestKit(node1); + rpcBroker1 = node1.actorOf(RpcBroker.props(domRpcService1)); + rpcRegistry2Probe = new JavaTestKit(node2); + rpcBroker2 = node2.actorOf(RpcBroker.props(domRpcService2)); + remoteRpcImpl1 = new RemoteRpcImplementation(rpcRegistry1Probe.getRef(), config1); + remoteRpcImpl2 = new RemoteRpcImplementation(rpcRegistry2Probe.getRef(), config2); - probeReg1 = new JavaTestKit(node1); - rpcBroker1 = node1.actorOf(RpcBroker.props(rpcService, probeReg1.getRef())); - probeReg2 = new JavaTestKit(node2); - rpcBroker2 = node2.actorOf(RpcBroker.props(rpcService, probeReg2.getRef())); } @@ -104,6 +128,38 @@ public class AbstractRpcTest { } } + static void assertCompositeNodeEquals(final NormalizedNode exp, final NormalizedNode actual) { + assertEquals(exp, actual); + } + + static ContainerNode makeRPCInput(final String data) { + return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_INPUT)) + .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build(); + + } + + static ContainerNode makeRPCOutput(final String data) { + return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_OUTPUT)) + .withChild(ImmutableNodes.leafNode(TEST_RPC_OUTPUT, data)).build(); + } + + static void assertFailedRpcResult(final DOMRpcResult rpcResult, final ErrorSeverity severity, + final ErrorType errorType, final String tag, final String message, final String applicationTag, final String info, + final String causeMsg) { + + assertNotNull("RpcResult was null", rpcResult); + final Collection rpcErrors = rpcResult.getErrors(); + assertEquals("RpcErrors count", 1, rpcErrors.size()); + assertRpcErrorEquals(rpcErrors.iterator().next(), severity, errorType, tag, message, + applicationTag, info, causeMsg); + } + + static void assertSuccessfulRpcResult(final DOMRpcResult rpcResult, + final NormalizedNode expOutput) { + assertNotNull("RpcResult was null", rpcResult); + assertCompositeNodeEquals(expOutput, rpcResult.getResult()); + } + static class TestException extends Exception { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java index 2026d48a81..b3ed7ffb9e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java @@ -8,6 +8,34 @@ package org.opendaylight.controller.remote.rpc; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +import akka.actor.ActorRef; +import akka.actor.Status; +import akka.japi.Pair; +import akka.testkit.JavaTestKit; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; /*** * Unit tests for RemoteRpcImplementation. @@ -17,7 +45,201 @@ package org.opendaylight.controller.remote.rpc; public class RemoteRpcImplementationTest extends AbstractRpcTest { - private RemoteRpcProviderConfig getConfig(){ + + @Test(expected = DOMRpcImplementationNotAvailableException.class) + public void testInvokeRpcWithNoRemoteActor() throws Exception { + final ContainerNode input = makeRPCInput("foo"); + final CheckedFuture failedFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, input); + rpcRegistry1Probe.expectMsgClass(JavaTestKit.duration("5 seconds"), RpcRegistry.Messages.FindRouters.class); + rpcRegistry1Probe + .reply(new RpcRegistry.Messages.FindRoutersReply(Collections.>emptyList())); + failedFuture.checkedGet(5, TimeUnit.SECONDS); + } + + + /** + * This test method invokes and executes the remote rpc + */ + @Test + public void testInvokeRpc() throws Exception { + final ContainerNode rpcOutput = makeRPCOutput("bar"); + final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput); + + final NormalizedNode invokeRpcInput = makeRPCInput("foo"); + @SuppressWarnings({"unchecked", "rawtypes"}) + final ArgumentCaptor> inputCaptor = + (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class); + + when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn( + Futures.immediateCheckedFuture(rpcResult)); + + final CheckedFuture frontEndFuture = + remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput); + assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture); + final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + final RouteIdentifier routeIdentifier = findRouters.getRouteIdentifier(); + assertEquals("getType", TEST_RPC, routeIdentifier.getType()); + assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute()); + + rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair( + rpcBroker2, 200L)))); + + final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS); + assertEquals(rpcOutput, result.getResult()); + } + + /** + * This test method invokes and executes the remote rpc + */ + @Test + public void testInvokeRpcWithNullInput() throws Exception { + final ContainerNode rpcOutput = makeRPCOutput("bar"); + final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput); + + @SuppressWarnings({"unchecked", "rawtypes"}) + final ArgumentCaptor> inputCaptor = + (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class); + + when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn( + Futures.immediateCheckedFuture(rpcResult)); + + final CheckedFuture frontEndFuture = + remoteRpcImpl1.invokeRpc(TEST_RPC_ID, null); + assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture); + final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + final RouteIdentifier routeIdentifier = findRouters.getRouteIdentifier(); + assertEquals("getType", TEST_RPC, routeIdentifier.getType()); + assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute()); + + rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair( + rpcBroker2, 200L)))); + + final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS); + assertEquals(rpcOutput, result.getResult()); + } + + + /** + * This test method invokes and executes the remote rpc + */ + @Test + public void testInvokeRpcWithNoOutput() throws Exception { + final ContainerNode rpcOutput = null; + final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput); + + final NormalizedNode invokeRpcInput = makeRPCInput("foo"); + @SuppressWarnings({"unchecked", "rawtypes"}) + final ArgumentCaptor> inputCaptor = + (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class); + + when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn( + Futures.immediateCheckedFuture(rpcResult)); + + final CheckedFuture frontEndFuture = + remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput); + assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture); + final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + final RouteIdentifier routeIdentifier = findRouters.getRouteIdentifier(); + assertEquals("getType", TEST_RPC, routeIdentifier.getType()); + assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute()); + + rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair( + rpcBroker2, 200L)))); + + final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS); + assertNull(result.getResult()); + } + + + /** + * This test method invokes and executes the remote rpc + */ + @Test(expected = DOMRpcException.class) + public void testInvokeRpcWithRemoteFailedFuture() throws Exception { + final ContainerNode rpcOutput = null; + final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput); + + final NormalizedNode invokeRpcInput = makeRPCInput("foo"); + @SuppressWarnings({"unchecked", "rawtypes"}) + final ArgumentCaptor> inputCaptor = + (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class); + + when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn( + Futures.immediateFailedCheckedFuture(new DOMRpcException( + "Test Exception") {})); + + final CheckedFuture frontEndFuture = + remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput); + assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture); + final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + final RouteIdentifier routeIdentifier = findRouters.getRouteIdentifier(); + assertEquals("getType", TEST_RPC, routeIdentifier.getType()); + assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute()); + + rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair( + rpcBroker2, 200L)))); + frontEndFuture.checkedGet(5, TimeUnit.SECONDS); + } + + /** + * This test method invokes and tests exceptions when akka timeout occured + * + * Currently ignored since this test with current config takes around 15 seconds + * to complete. + * + */ + @Ignore + @Test(expected = RemoteDOMRpcException.class) + public void testInvokeRpcWithAkkaTimeoutException() throws Exception { + final NormalizedNode invokeRpcInput = makeRPCInput("foo"); + @SuppressWarnings({"unchecked", "rawtypes"}) + final ArgumentCaptor> inputCaptor = + (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class); + final CheckedFuture frontEndFuture = + remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput); + assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture); + final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + final RouteIdentifier routeIdentifier = findRouters.getRouteIdentifier(); + assertEquals("getType", TEST_RPC, routeIdentifier.getType()); + assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute()); + + frontEndFuture.checkedGet(20, TimeUnit.SECONDS); + } + + /** + * This test method invokes remote rpc and lookup failed + * with runtime exception. + */ + @Test(expected = DOMRpcException.class) + public void testInvokeRpcWithLookupException() throws Exception { + final NormalizedNode invokeRpcInput = makeRPCInput("foo"); + @SuppressWarnings({"unchecked", "rawtypes"}) + final ArgumentCaptor> inputCaptor = + (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class); + final CheckedFuture frontEndFuture = + remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput); + assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture); + final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + final RouteIdentifier routeIdentifier = findRouters.getRouteIdentifier(); + assertEquals("getType", TEST_RPC, routeIdentifier.getType()); + assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute()); + rpcRegistry1Probe.reply( new Status.Failure(new RuntimeException("test"))); + frontEndFuture.checkedGet(5, TimeUnit.SECONDS); + } + + /** + * This test method invokes and executes the remote rpc + */ + @Test(expected = DOMRpcImplementationNotAvailableException.class) + public void testInvokeRpcWithLoopException() throws Exception { + final NormalizedNode invokeRpcInput = RemoteRpcInput.from(NormalizedNodeSerializer.serialize(makeRPCInput("foo"))); + final CheckedFuture frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput); + + frontEndFuture.checkedGet(5, TimeUnit.SECONDS); + } + + + private RemoteRpcProviderConfig getConfig() { return new RemoteRpcProviderConfig.Builder("unit-test").build(); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java index 5cd3df3a24..78a368fe8d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java @@ -10,11 +10,25 @@ package org.opendaylight.controller.remote.rpc; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import com.typesafe.config.Config; +import java.util.concurrent.TimeUnit; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.model.SchemaService; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; public class RemoteRpcProviderTest { @@ -35,4 +49,22 @@ public class RemoteRpcProviderTest { system = null; } + @Test + public void testRemoteRpcProvider() throws Exception { + final RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class)); + final Broker.ProviderSession session = mock(Broker.ProviderSession.class); + final SchemaService schemaService = mock(SchemaService.class); + when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class)); + when(session.getService(SchemaService.class)).thenReturn(schemaService); + when(session.getService(DOMRpcService.class)).thenReturn(mock(DOMRpcService.class)); + + rpcProvider.onSessionInitiated(session); + + final ActorRef actorRef = Await.result( + system.actorSelection( + moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)), + Duration.create(2, TimeUnit.SECONDS)); + + Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath())); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java index 16b13910a8..6ecd7dac3a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java @@ -9,8 +9,71 @@ package org.opendaylight.controller.remote.rpc; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +import akka.actor.Status.Failure; +import akka.testkit.JavaTestKit; +import com.google.common.util.concurrent.Futures; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; +import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class RpcBrokerTest extends AbstractRpcTest { + @Test + public void testExecuteRpc() { + new JavaTestKit(node1) { + { + + final ContainerNode invokeRpcResult = makeRPCOutput("bar"); + final DOMRpcResult rpcResult = new DefaultDOMRpcResult(invokeRpcResult); + when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.>any())).thenReturn( + Futures.immediateCheckedFuture(rpcResult)); + + final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null); + + rpcBroker1.tell(executeMsg, getRef()); + + final RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class); + + assertEquals(rpcResult.getResult(), + NormalizedNodeSerializer.deSerialize(rpcResponse.getResultNormalizedNode())); + } + }; + } + + @Test + public void testExecuteRpcFailureWithException() { + + new JavaTestKit(node1) { + { + + when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.>any())) + .thenReturn( + Futures.immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException( + "NOT FOUND"))); + + final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null); + + rpcBroker1.tell(executeMsg, getRef()); + + final Failure rpcResponse = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + + Assert.assertTrue(rpcResponse.cause() instanceof DOMRpcException); + } + }; + + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java index ecfaef8419..37d81c7afb 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java @@ -8,26 +8,70 @@ package org.opendaylight.controller.remote.rpc; + +import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import com.typesafe.config.ConfigFactory; +import java.net.URISyntaxException; +import java.util.Collections; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; public class RpcListenerTest { - static ActorSystem system; + private static final QName TEST_QNAME = QName.create("test", "2015-06-12", "test"); + private static final SchemaPath RPC_TYPE = SchemaPath.create(true, TEST_QNAME); + private static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier + .create(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)); + private static final DOMRpcIdentifier RPC_ID = DOMRpcIdentifier.create(RPC_TYPE, TEST_PATH); + static ActorSystem system; + + + @BeforeClass + public static void setup() throws InterruptedException { + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + @Test + public void testRouteAdd() throws URISyntaxException, InterruptedException { + new JavaTestKit(system) { + { + // Test announcements + final JavaTestKit probeReg = new JavaTestKit(system); + final ActorRef rpcRegistry = probeReg.getRef(); - @BeforeClass - public static void setup() throws InterruptedException { - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); - } + final RpcListener rpcListener = new RpcListener(rpcRegistry); + rpcListener.onRpcAvailable(Collections.singleton(RPC_ID)); + probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class); + } + }; + } - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } + @Test + public void testRouteRemove() throws URISyntaxException, InterruptedException { + new JavaTestKit(system) { + { + // Test announcements + final JavaTestKit probeReg = new JavaTestKit(system); + final ActorRef rpcRegistry = probeReg.getRef(); + final RpcListener rpcListener = new RpcListener(rpcRegistry); + rpcListener.onRpcUnavailable(Collections.singleton(RPC_ID)); + probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class); + } + }; + } } -- 2.36.6