package org.opendaylight.controller.remote.rpc;
import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.CheckedFuture;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
+/**
+ * @author tony
+ *
+ */
class RemoteDOMRpcFuture extends AbstractFuture<DOMRpcResult> implements CheckedFuture<DOMRpcResult, DOMRpcException> {
private static final Logger LOG = LoggerFactory.getLogger(RemoteDOMRpcFuture.class);
- private RemoteDOMRpcFuture(final Future<Object> future) {
- future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global());
+ private final QName rpcName;
+
+ private RemoteDOMRpcFuture(final QName rpcName) {
+ this.rpcName = Preconditions.checkNotNull(rpcName,"rpcName");
}
- public static CheckedFuture<DOMRpcResult, DOMRpcException> from(final Future<Object> future) {
- return new RemoteDOMRpcFuture(future);
+ public static RemoteDOMRpcFuture create(final QName rpcName) {
+ return new RemoteDOMRpcFuture(rpcName);
+ }
+
+ protected void failNow(final Throwable error) {
+ LOG.debug("Failing future {} for rpc {}", this, rpcName, error);
+ setException(error);
+ }
+
+ protected void completeWith(final Future<Object> future) {
+ future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global());
}
@Override
@Override
public void onComplete(final Throwable error, final Object reply) throws Throwable {
if (error != null) {
- RemoteDOMRpcFuture.this.setException(error);
+ RemoteDOMRpcFuture.this.failNow(error);
} else if (reply instanceof RpcResponse) {
final RpcResponse rpcReply = (RpcResponse) reply;
final NormalizedNode<?, ?> result;
if (rpcReply.getResultNormalizedNode() == null) {
result = null;
- LOG.debug("Received response for invoke rpc: result is null");
+ LOG.debug("Received response for rpc {}: result is null", rpcName);
} else {
result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
- LOG.debug("Received response for invoke rpc: result is {}", result);
+ LOG.debug("Received response for rpc {}: result is {}", rpcName, result);
}
RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result));
+ LOG.debug("Future {} for rpc {} successfully completed", RemoteDOMRpcFuture.this, rpcName);
}
- RemoteDOMRpcFuture.this.setException(new IllegalStateException("Incorrect reply type " + reply
+ RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply
+ "from Akka"));
}
}
import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.japi.Pair;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.List;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
+import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
public class RemoteRpcImplementation implements DOMRpcImplementation {
- private final ActorRef rpcBroker;
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
+
+ private final ActorRef rpcRegistry;
private final RemoteRpcProviderConfig config;
- public RemoteRpcImplementation(final ActorRef rpcBroker, final RemoteRpcProviderConfig config) {
- this.rpcBroker = rpcBroker;
+ public RemoteRpcImplementation(final ActorRef rpcRegistry, final RemoteRpcProviderConfig config) {
this.config = config;
+ this.rpcRegistry = rpcRegistry;
}
@Override
- public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final DOMRpcIdentifier rpc, final NormalizedNode<?, ?> input) {
- final InvokeRpc rpcMsg = new InvokeRpc(rpc.getType().getLastComponent(), rpc.getContextReference(), input);
- final scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
- return RemoteDOMRpcFuture.from(future);
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final DOMRpcIdentifier rpc,
+ final NormalizedNode<?, ?> input) {
+ if (input instanceof RemoteRpcInput) {
+ LOG.warn("Rpc {} was removed during execution or there is loop present. Failing received rpc.", rpc);
+ return Futures
+ .<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException(
+ "Rpc implementation for {} was removed during processing.", rpc));
+ }
+ final RemoteDOMRpcFuture frontEndFuture = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
+ findRouteAsync(rpc).onComplete(new OnComplete<FindRoutersReply>() {
+
+ @Override
+ public void onComplete(final Throwable error, final FindRoutersReply routes) throws Throwable {
+ if (error != null) {
+ frontEndFuture.failNow(error);
+ } else {
+ final List<Pair<ActorRef, Long>> routePairs = routes.getRouterWithUpdateTime();
+ if (routePairs == null || routePairs.isEmpty()) {
+ frontEndFuture.failNow(new DOMRpcImplementationNotAvailableException(
+ "No local or remote implementation available for rpc %s", rpc.getType(), error));
+ } else {
+ final ActorRef remoteImplRef = new LatestEntryRoutingLogic(routePairs).select();
+ final Object executeRpcMessage = ExecuteRpc.from(rpc, input);
+ LOG.debug("Found remote actor {} for rpc {} - sending {}", remoteImplRef, rpc.getType(), executeRpcMessage);
+ frontEndFuture.completeWith(ask(remoteImplRef, executeRpcMessage, config.getAskDuration()));
+ }
+ }
+ }
+ }, ExecutionContext.Implicits$.MODULE$.global());
+ return frontEndFuture;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private Future<FindRoutersReply> findRouteAsync(final DOMRpcIdentifier rpc) {
+ // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly.
+ final RpcRouter.RouteIdentifier<?, ?, ?> routeId =
+ new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
+ final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
+ return (Future) ask(rpcRegistry, findMsg, config.getAskDuration());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+
+class RemoteRpcInput implements ContainerNode {
+
+ private final ContainerNode delegate;
+
+ private RemoteRpcInput(final ContainerNode delegate) {
+ this.delegate = delegate;
+ }
+
+ protected static RemoteRpcInput from(final Node node) {
+ if(node == null) {
+ return null;
+ }
+ final NormalizedNode<?, ?> deserialized = NormalizedNodeSerializer.deSerialize(node);
+ Preconditions.checkArgument(deserialized instanceof ContainerNode);
+ return new RemoteRpcInput((ContainerNode) deserialized);
+ }
+
+ ContainerNode delegate() {
+ return delegate;
+ }
+
+ @Override
+ public Map<QName, String> getAttributes() {
+ return delegate().getAttributes();
+ }
+
+ @Override
+ public Object getAttributeValue(final QName name) {
+ return delegate().getAttributeValue(name);
+ }
+
+ @Override
+ public QName getNodeType() {
+ return delegate().getNodeType();
+ }
+
+ @Override
+ public Collection<DataContainerChild<? extends PathArgument, ?>> getValue() {
+ return delegate().getValue();
+ }
+
+ @Override
+ public NodeIdentifier getIdentifier() {
+ return delegate().getIdentifier();
+ }
+
+ @Override
+ public Optional<DataContainerChild<? extends PathArgument, ?>> getChild(final PathArgument child) {
+ return delegate().getChild(child);
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate().hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ return delegate().equals(obj);
+ }
+}
package org.opendaylight.controller.remote.rpc;
-import static akka.pattern.Patterns.ask;
-
import akka.actor.ActorRef;
import akka.actor.Props;
-import akka.dispatch.OnComplete;
import akka.japi.Creator;
-import akka.japi.Pair;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
-import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
public class RpcBroker extends AbstractUntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
- private final ActorRef rpcRegistry;
- private final RemoteRpcProviderConfig config;
private final DOMRpcService rpcService;
- private RpcBroker(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+ private RpcBroker(final DOMRpcService rpcService) {
this.rpcService = rpcService;
- this.rpcRegistry = rpcRegistry;
- config = new RemoteRpcProviderConfig(getContext().system().settings().config());
}
- public static Props props(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
- Preconditions.checkNotNull(rpcRegistry, "ActorRef can not be null!");
+ public static Props props(final DOMRpcService rpcService) {
Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
- return Props.create(new RpcBrokerCreator(rpcService, rpcRegistry));
+ return Props.create(new RpcBrokerCreator(rpcService));
}
@Override
protected void handleReceive(final Object message) throws Exception {
- if(message instanceof InvokeRpc) {
- invokeRemoteRpc((InvokeRpc) message);
- } else if(message instanceof ExecuteRpc) {
+ if (message instanceof ExecuteRpc) {
executeRpc((ExecuteRpc) message);
}
}
- private void invokeRemoteRpc(final InvokeRpc msg) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
- }
- final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
- null, msg.getRpc(), msg.getIdentifier());
- final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
-
- final scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
-
- final ActorRef sender = getSender();
- final ActorRef self = self();
-
- final OnComplete<Object> onComplete = new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object reply) throws Throwable {
- if(failure != null) {
- LOG.error("FindRouters failed", failure);
- sender.tell(new akka.actor.Status.Failure(failure), self);
- return;
- }
-
- final RpcRegistry.Messages.FindRoutersReply findReply =
- (RpcRegistry.Messages.FindRoutersReply)reply;
-
- final List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
-
- if(actorRefList == null || actorRefList.isEmpty()) {
- sender.tell(new akka.actor.Status.Failure(new DOMRpcImplementationNotAvailableException(
- "No remote implementation available for rpc %s", msg.getRpc())), self);
- return;
- }
- finishInvokeRpc(actorRefList, msg, sender, self);
- }
- };
-
- future.onComplete(onComplete, getContext().dispatcher());
- }
-
- protected void finishInvokeRpc(final List<Pair<ActorRef, Long>> actorRefList,
- final InvokeRpc msg, final ActorRef sender, final ActorRef self) {
-
- final RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
-
- final Node serializedNode = NormalizedNodeSerializer.serialize(msg.getInput());
- final ExecuteRpc executeMsg = new ExecuteRpc(serializedNode, msg.getRpc());
-
- final scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
-
- final OnComplete<Object> onComplete = new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object reply) throws Throwable {
- if(failure != null) {
- LOG.error("ExecuteRpc failed", failure);
- sender.tell(new akka.actor.Status.Failure(failure), self);
- return;
- }
-
- LOG.debug("Execute Rpc response received for rpc : {}, responding to sender : {}", msg.getRpc(), sender);
-
- sender.tell(reply, self);
- }
- };
-
- future.onComplete(onComplete, getContext().dispatcher());
- }
-
private void executeRpc(final ExecuteRpc msg) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Executing rpc {}", msg.getRpc());
- }
- final NormalizedNode<?, ?> input = NormalizedNodeSerializer.deSerialize(msg.getInputNormalizedNode());
+ LOG.debug("Executing rpc {}", msg.getRpc());
+ final NormalizedNode<?, ?> input = RemoteRpcInput.from(msg.getInputNormalizedNode());
final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
-
- final CheckedFuture<DOMRpcResult, DOMRpcException> future = rpcService.invokeRpc(schemaPath, input);
-
- final ListenableFuture<DOMRpcResult> listenableFuture =
- JdkFutureAdapters.listenInPoolThread(future);
-
final ActorRef sender = getSender();
final ActorRef self = self();
- Futures.addCallback(listenableFuture, new FutureCallback<DOMRpcResult>() {
- @Override
- public void onSuccess(final DOMRpcResult result) {
- if (result.getErrors() != null && ( ! result.getErrors().isEmpty())) {
- final String message = String.format("Execution of RPC %s failed", msg.getRpc());
- Collection<RpcError> errors = result.getErrors();
- if(errors == null || errors.size() == 0) {
- errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
- null, message));
- }
+ try {
+ final CheckedFuture<DOMRpcResult, DOMRpcException> future = rpcService.invokeRpc(schemaPath, input);
- sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
- message, errors)), self);
- } else {
- final Node serializedResultNode;
- if(result.getResult() == null){
- serializedResultNode = null;
+ Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
+ @Override
+ public void onSuccess(final DOMRpcResult result) {
+ if (result.getErrors() != null && (!result.getErrors().isEmpty())) {
+ final String message = String.format("Execution of RPC %s failed", msg.getRpc());
+ Collection<RpcError> errors = result.getErrors();
+ if (errors == null || errors.size() == 0) {
+ errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, null, message));
+ }
+
+ sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, errors)), self);
} else {
- serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
- }
+ final Node serializedResultNode;
+ if (result.getResult() == null) {
+ serializedResultNode = null;
+ } else {
+ serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
+ }
- LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
+ LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
- sender.tell(new RpcResponse(serializedResultNode), self);
+ sender.tell(new RpcResponse(serializedResultNode), self);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
+ sender.tell(new akka.actor.Status.Failure(t), self);
}
- }
-
- @Override
- public void onFailure(final Throwable t) {
- LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
- sender.tell(new akka.actor.Status.Failure(t), self);
- }
- });
+ });
+ } catch (final Exception e) {
+ sender.tell(new akka.actor.Status.Failure(e), sender);
+ }
}
private static class RpcBrokerCreator implements Creator<RpcBroker> {
private static final long serialVersionUID = 1L;
final DOMRpcService rpcService;
- final ActorRef rpcRegistry;
- RpcBrokerCreator(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+ RpcBrokerCreator(final DOMRpcService rpcService) {
this.rpcService = rpcService;
- this.rpcRegistry = rpcRegistry;
}
@Override
public RpcBroker create() throws Exception {
- return new RpcBroker(rpcService, rpcRegistry);
+ return new RpcBroker(rpcService);
}
}
}
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
*
* @author Thomas Pantelis
*/
-public class RpcErrorsException extends Exception {
+public class RpcErrorsException extends DOMRpcException {
private static final long serialVersionUID = 1L;
final String info;
final Throwable cause;
- RpcErrorData(ErrorSeverity severity, ErrorType errorType, String tag,
- String applicationTag, String message, String info, Throwable cause) {
+ RpcErrorData(final ErrorSeverity severity, final ErrorType errorType, final String tag,
+ final String applicationTag, final String message, final String info, final Throwable cause) {
this.severity = severity;
this.errorType = errorType;
this.tag = tag;
private final List<RpcErrorData> rpcErrorDataList = new ArrayList<>();
- public RpcErrorsException(String message, Iterable<RpcError> rpcErrors) {
+ public RpcErrorsException(final String message, final Iterable<RpcError> rpcErrors) {
super(message);
- for(RpcError rpcError: rpcErrors) {
+ for(final RpcError rpcError: rpcErrors) {
rpcErrorDataList.add(new RpcErrorData(rpcError.getSeverity(), rpcError.getErrorType(),
rpcError.getTag(), rpcError.getApplicationTag(), rpcError.getMessage(),
rpcError.getInfo(), rpcError.getCause()));
}
public Collection<RpcError> getRpcErrors() {
- Collection<RpcError> rpcErrors = new ArrayList<>();
- for(RpcErrorData ed: rpcErrorDataList) {
- RpcError rpcError = ed.severity == ErrorSeverity.ERROR ?
+ final Collection<RpcError> rpcErrors = new ArrayList<>();
+ for(final RpcErrorData ed: rpcErrorDataList) {
+ final RpcError rpcError = ed.severity == ErrorSeverity.ERROR ?
RpcResultBuilder.newError(ed.errorType, ed.tag, ed.message, ed.applicationTag,
ed.info, ed.cause) :
RpcResultBuilder.newWarning(ed.errorType, ed.tag, ed.message, ed.applicationTag,
withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
rpcBroker =
- getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry).
+ getContext().actorOf(RpcBroker.props(rpcServices).
withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
LOG.debug("Registers rpc listeners");
rpcListener = new RpcListener(rpcRegistry);
- rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
+ rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
rpcServices.registerRpcListener(rpcListener);
}
private void registerRoutedRpcDelegate() {
- Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
- Set<Module> modules = schemaContext.getModules();
- for(Module module : modules){
- for(RpcDefinition rpcDefinition : module.getRpcs()){
+ final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
+ final Set<Module> modules = schemaContext.getModules();
+ for(final Module module : modules){
+ for(final RpcDefinition rpcDefinition : module.getRpcs()){
if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
for (final RpcDefinition rpcDef : currentlySupportedRpc) {
rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
}
- rpcListener.onRpcAvailable(rpcs);
+ if(!rpcs.isEmpty()) {
+ rpcListener.onRpcAvailable(rpcs);
+ }
}
package org.opendaylight.controller.remote.rpc.messages;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.io.Serializable;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+/**
+ * @author tony
+ *
+ */
public class ExecuteRpc implements Serializable {
private static final long serialVersionUID = 1128904894827335676L;
private final NormalizedNodeMessages.Node inputNormalizedNode;
private final QName rpc;
- public ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) {
- Preconditions.checkNotNull(inputNormalizedNode, "Normalized Node input string should be present");
+ private ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) {
Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
this.inputNormalizedNode = inputNormalizedNode;
public QName getRpc() {
return rpc;
}
+
+ public static ExecuteRpc from(final DOMRpcIdentifier rpc, final NormalizedNode<?, ?> input) {
+ final Node serializedInput;
+ if(input != null) {
+ serializedInput = NormalizedNodeSerializer.serialize(input);
+ } else {
+ serializedInput = null;
+ }
+ return new ExecuteRpc(serializedInput, rpc.getType().getLastComponent());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("rpc", rpc)
+ .add("normalizedNode", inputNormalizedNode)
+ .toString();
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc.messages;
-
-import com.google.common.base.Preconditions;
-import java.io.Serializable;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class InvokeRpc implements Serializable {
- private static final long serialVersionUID = -2813459607858108953L;
-
- private final QName rpc;
- private final YangInstanceIdentifier identifier;
- private final NormalizedNode<?,?> input;
-
- public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final NormalizedNode<?,?> input) {
- Preconditions.checkNotNull(rpc, "rpc qname should not be null");
- Preconditions.checkNotNull(input, "rpc input should not be null");
-
- this.rpc = rpc;
- this.identifier = identifier;
- this.input = input;
- }
-
- public QName getRpc() {
- return rpc;
- }
-
- public YangInstanceIdentifier getIdentifier() {
- return identifier;
- }
-
- public NormalizedNode<?,?> getInput() {
- return input;
- }
-}
package org.opendaylight.controller.remote.rpc;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.io.File;
import java.net.URI;
import java.util.Arrays;
+import java.util.Collection;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.mockito.Mockito;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
/**
static final QName TEST_RPC_OUTPUT = QName.create(TEST_NS, TEST_REV, "output");
static final QName TEST_RPC_OUTPUT_DATA = new QName(TEST_URI, "output-data");
+
+ static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC);
+ static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(TEST_RPC));
+ static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH);
+
static ActorSystem node1;
static ActorSystem node2;
+ static RemoteRpcProviderConfig config1;
+ static RemoteRpcProviderConfig config2;
protected ActorRef rpcBroker1;
- protected JavaTestKit probeReg1;
+ protected JavaTestKit rpcRegistry1Probe;
protected ActorRef rpcBroker2;
- protected JavaTestKit probeReg2;
+ protected JavaTestKit rpcRegistry2Probe;
protected Broker.ProviderSession brokerSession;
protected SchemaContext schemaContext;
- protected DOMRpcService rpcService;
+ protected RemoteRpcImplementation remoteRpcImpl1;
+ protected RemoteRpcImplementation remoteRpcImpl2;
+ protected DOMRpcService domRpcService1;
+ protected DOMRpcService domRpcService2;
@BeforeClass
public static void setup() throws InterruptedException {
- final RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
- final RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+ config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+ config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
node1 = ActorSystem.create("opendaylight-rpc", config1.get());
node2 = ActorSystem.create("opendaylight-rpc", config2.get());
}
schemaContext = new YangParserImpl().parseFiles(Arrays.asList(
new File(RpcBrokerTest.class.getResource("/test-rpc.yang").getPath())));
- brokerSession = Mockito.mock(Broker.ProviderSession.class);
- rpcService = Mockito.mock(DOMRpcService.class);
+ domRpcService1 = Mockito.mock(DOMRpcService.class);
+ domRpcService2 = Mockito.mock(DOMRpcService.class);
+ rpcRegistry1Probe = new JavaTestKit(node1);
+ rpcBroker1 = node1.actorOf(RpcBroker.props(domRpcService1));
+ rpcRegistry2Probe = new JavaTestKit(node2);
+ rpcBroker2 = node2.actorOf(RpcBroker.props(domRpcService2));
+ remoteRpcImpl1 = new RemoteRpcImplementation(rpcRegistry1Probe.getRef(), config1);
+ remoteRpcImpl2 = new RemoteRpcImplementation(rpcRegistry2Probe.getRef(), config2);
- probeReg1 = new JavaTestKit(node1);
- rpcBroker1 = node1.actorOf(RpcBroker.props(rpcService, probeReg1.getRef()));
- probeReg2 = new JavaTestKit(node2);
- rpcBroker2 = node2.actorOf(RpcBroker.props(rpcService, probeReg2.getRef()));
}
}
}
+ static void assertCompositeNodeEquals(final NormalizedNode<? , ?> exp, final NormalizedNode<? , ? > actual) {
+ assertEquals(exp, actual);
+ }
+
+ static ContainerNode makeRPCInput(final String data) {
+ return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_INPUT))
+ .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build();
+
+ }
+
+ static ContainerNode makeRPCOutput(final String data) {
+ return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_OUTPUT))
+ .withChild(ImmutableNodes.leafNode(TEST_RPC_OUTPUT, data)).build();
+ }
+
+ static void assertFailedRpcResult(final DOMRpcResult rpcResult, final ErrorSeverity severity,
+ final ErrorType errorType, final String tag, final String message, final String applicationTag, final String info,
+ final String causeMsg) {
+
+ assertNotNull("RpcResult was null", rpcResult);
+ final Collection<RpcError> rpcErrors = rpcResult.getErrors();
+ assertEquals("RpcErrors count", 1, rpcErrors.size());
+ assertRpcErrorEquals(rpcErrors.iterator().next(), severity, errorType, tag, message,
+ applicationTag, info, causeMsg);
+ }
+
+ static void assertSuccessfulRpcResult(final DOMRpcResult rpcResult,
+ final NormalizedNode<? , ?> expOutput) {
+ assertNotNull("RpcResult was null", rpcResult);
+ assertCompositeNodeEquals(expOutput, rpcResult.getResult());
+ }
+
static class TestException extends Exception {
private static final long serialVersionUID = 1L;
package org.opendaylight.controller.remote.rpc;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.japi.Pair;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/***
* Unit tests for RemoteRpcImplementation.
public class RemoteRpcImplementationTest extends AbstractRpcTest {
- private RemoteRpcProviderConfig getConfig(){
+
+ @Test(expected = DOMRpcImplementationNotAvailableException.class)
+ public void testInvokeRpcWithNoRemoteActor() throws Exception {
+ final ContainerNode input = makeRPCInput("foo");
+ final CheckedFuture<DOMRpcResult, DOMRpcException> failedFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, input);
+ rpcRegistry1Probe.expectMsgClass(JavaTestKit.duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
+ rpcRegistry1Probe
+ .reply(new RpcRegistry.Messages.FindRoutersReply(Collections.<Pair<ActorRef, Long>>emptyList()));
+ failedFuture.checkedGet(5, TimeUnit.SECONDS);
+ }
+
+
+ /**
+ * This test method invokes and executes the remote rpc
+ */
+ @Test
+ public void testInvokeRpc() throws Exception {
+ final ContainerNode rpcOutput = makeRPCOutput("bar");
+ final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
+
+ final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+ (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+
+ when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
+ Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+ remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+ assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+ final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+ assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+ assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+ rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<ActorRef, Long>(
+ rpcBroker2, 200L))));
+
+ final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+ assertEquals(rpcOutput, result.getResult());
+ }
+
+ /**
+ * This test method invokes and executes the remote rpc
+ */
+ @Test
+ public void testInvokeRpcWithNullInput() throws Exception {
+ final ContainerNode rpcOutput = makeRPCOutput("bar");
+ final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+ (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+
+ when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
+ Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+ remoteRpcImpl1.invokeRpc(TEST_RPC_ID, null);
+ assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+ final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+ assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+ assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+ rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<ActorRef, Long>(
+ rpcBroker2, 200L))));
+
+ final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+ assertEquals(rpcOutput, result.getResult());
+ }
+
+
+ /**
+ * This test method invokes and executes the remote rpc
+ */
+ @Test
+ public void testInvokeRpcWithNoOutput() throws Exception {
+ final ContainerNode rpcOutput = null;
+ final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
+
+ final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+ (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+
+ when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
+ Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+ remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+ assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+ final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+ assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+ assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+ rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<ActorRef, Long>(
+ rpcBroker2, 200L))));
+
+ final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+ assertNull(result.getResult());
+ }
+
+
+ /**
+ * This test method invokes and executes the remote rpc
+ */
+ @Test(expected = DOMRpcException.class)
+ public void testInvokeRpcWithRemoteFailedFuture() throws Exception {
+ final ContainerNode rpcOutput = null;
+ final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
+
+ final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+ (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+
+ when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
+ Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcException(
+ "Test Exception") {}));
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+ remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+ assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+ final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+ assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+ assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+ rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<ActorRef, Long>(
+ rpcBroker2, 200L))));
+ frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+ }
+
+ /**
+ * This test method invokes and tests exceptions when akka timeout occured
+ *
+ * Currently ignored since this test with current config takes around 15 seconds
+ * to complete.
+ *
+ */
+ @Ignore
+ @Test(expected = RemoteDOMRpcException.class)
+ public void testInvokeRpcWithAkkaTimeoutException() throws Exception {
+ final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+ (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+ final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+ remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+ assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+ final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+ assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+ assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+
+ frontEndFuture.checkedGet(20, TimeUnit.SECONDS);
+ }
+
+ /**
+ * This test method invokes remote rpc and lookup failed
+ * with runtime exception.
+ */
+ @Test(expected = DOMRpcException.class)
+ public void testInvokeRpcWithLookupException() throws Exception {
+ final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
+ (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+ final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+ remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+ assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
+ final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+ assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+ assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
+ rpcRegistry1Probe.reply( new Status.Failure(new RuntimeException("test")));
+ frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+ }
+
+ /**
+ * This test method invokes and executes the remote rpc
+ */
+ @Test(expected = DOMRpcImplementationNotAvailableException.class)
+ public void testInvokeRpcWithLoopException() throws Exception {
+ final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(NormalizedNodeSerializer.serialize(makeRPCInput("foo")));
+ final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+
+ frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
+ }
+
+
+ private RemoteRpcProviderConfig getConfig() {
return new RemoteRpcProviderConfig.Builder("unit-test").build();
}
}
package org.opendaylight.controller.remote.rpc;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import com.typesafe.config.Config;
+import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
public class RemoteRpcProviderTest {
system = null;
}
+ @Test
+ public void testRemoteRpcProvider() throws Exception {
+ final RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class));
+ final Broker.ProviderSession session = mock(Broker.ProviderSession.class);
+ final SchemaService schemaService = mock(SchemaService.class);
+ when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class));
+ when(session.getService(SchemaService.class)).thenReturn(schemaService);
+ when(session.getService(DOMRpcService.class)).thenReturn(mock(DOMRpcService.class));
+
+ rpcProvider.onSessionInitiated(session);
+
+ final ActorRef actorRef = Await.result(
+ system.actorSelection(
+ moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
+ Duration.create(2, TimeUnit.SECONDS));
+
+ Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
+ }
}
package org.opendaylight.controller.remote.rpc;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
+import akka.actor.Status.Failure;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class RpcBrokerTest extends AbstractRpcTest {
+ @Test
+ public void testExecuteRpc() {
+ new JavaTestKit(node1) {
+ {
+
+ final ContainerNode invokeRpcResult = makeRPCOutput("bar");
+ final DOMRpcResult rpcResult = new DefaultDOMRpcResult(invokeRpcResult);
+ when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.<NormalizedNode<?, ?>>any())).thenReturn(
+ Futures.<DOMRpcResult, DOMRpcException>immediateCheckedFuture(rpcResult));
+
+ final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
+
+ rpcBroker1.tell(executeMsg, getRef());
+
+ final RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+ assertEquals(rpcResult.getResult(),
+ NormalizedNodeSerializer.deSerialize(rpcResponse.getResultNormalizedNode()));
+ }
+ };
+ }
+
+ @Test
+ public void testExecuteRpcFailureWithException() {
+
+ new JavaTestKit(node1) {
+ {
+
+ when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.<NormalizedNode<?, ?>>any()))
+ .thenReturn(
+ Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException(
+ "NOT FOUND")));
+
+ final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
+
+ rpcBroker1.tell(executeMsg, getRef());
+
+ final Failure rpcResponse = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+ Assert.assertTrue(rpcResponse.cause() instanceof DOMRpcException);
+ }
+ };
+
+ }
}
package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import com.typesafe.config.ConfigFactory;
+import java.net.URISyntaxException;
+import java.util.Collections;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
public class RpcListenerTest {
- static ActorSystem system;
+ private static final QName TEST_QNAME = QName.create("test", "2015-06-12", "test");
+ private static final SchemaPath RPC_TYPE = SchemaPath.create(true, TEST_QNAME);
+ private static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier
+ .create(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME));
+ private static final DOMRpcIdentifier RPC_ID = DOMRpcIdentifier.create(RPC_TYPE, TEST_PATH);
+ static ActorSystem system;
+
+
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+ @Test
+ public void testRouteAdd() throws URISyntaxException, InterruptedException {
+ new JavaTestKit(system) {
+ {
+ // Test announcements
+ final JavaTestKit probeReg = new JavaTestKit(system);
+ final ActorRef rpcRegistry = probeReg.getRef();
- @BeforeClass
- public static void setup() throws InterruptedException {
- system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
- }
+ final RpcListener rpcListener = new RpcListener(rpcRegistry);
+ rpcListener.onRpcAvailable(Collections.singleton(RPC_ID));
+ probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class);
+ }
+ };
+ }
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
+ @Test
+ public void testRouteRemove() throws URISyntaxException, InterruptedException {
+ new JavaTestKit(system) {
+ {
+ // Test announcements
+ final JavaTestKit probeReg = new JavaTestKit(system);
+ final ActorRef rpcRegistry = probeReg.getRef();
+ final RpcListener rpcListener = new RpcListener(rpcRegistry);
+ rpcListener.onRpcUnavailable(Collections.singleton(RPC_ID));
+ probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class);
+ }
+ };
+ }
}