package org.opendaylight.controller.remote.rpc;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.dispatch.OnComplete;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.xml.codec.XmlUtils;
-import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
-import java.util.Collections;
-import java.util.Set;
-
-import static akka.pattern.Patterns.ask;
-
-public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
+public class RemoteRpcImplementation implements DOMRpcImplementation {
private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
private final ActorRef rpcBroker;
- private final SchemaContext schemaContext;
private final RemoteRpcProviderConfig config;
- public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext, RemoteRpcProviderConfig config) {
+ public RemoteRpcImplementation(final ActorRef rpcBroker, final RemoteRpcProviderConfig config) {
this.rpcBroker = rpcBroker;
- this.schemaContext = schemaContext;
this.config = config;
}
@Override
- public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc,
- YangInstanceIdentifier identifier, CompositeNode input) {
- InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input);
-
- return executeMsg(rpcMsg);
- }
-
- @Override
- public Set<QName> getSupportedRpcs() {
- // TODO : check if we need to get this from routing registry
- return Collections.emptySet();
- }
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final DOMRpcIdentifier rpc, final NormalizedNode<?, ?> input) {
+ final InvokeRpc rpcMsg = new InvokeRpc(rpc.getType().getLastComponent(), rpc.getContextReference(), input);
- @Override
- public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
- InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input);
- return executeMsg(rpcMsg);
- }
+ final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
- private ListenableFuture<RpcResult<CompositeNode>> executeMsg(InvokeRpc rpcMsg) {
+ final ListenableFuture<DOMRpcResult> listenableFuture =
+ JdkFutureAdapters.listenInPoolThread(settableFuture);
- final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
+ final scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
- scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
-
- OnComplete<Object> onComplete = new OnComplete<Object>() {
+ final OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object reply) throws Throwable {
+ public void onComplete(final Throwable failure, final Object reply) throws Throwable {
if(failure != null) {
LOG.error("InvokeRpc failed", failure);
- RpcResult<CompositeNode> rpcResult;
- if(failure instanceof RpcErrorsException) {
- rpcResult = RpcResultBuilder.<CompositeNode>failed().withRpcErrors(
- ((RpcErrorsException)failure).getRpcErrors()).build();
- } else {
- rpcResult = RpcResultBuilder.<CompositeNode>failed().withError(
- ErrorType.RPC, failure.getMessage(), failure).build();
+ final String message = String.format("Execution of RPC %s failed", rpcMsg.getRpc());
+ Collection<RpcError> errors = ((RpcErrorsException)failure).getRpcErrors();
+ if(errors == null || errors.size() == 0) {
+ errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, null, message));
}
+ final DOMRpcResult rpcResult = new DefaultDOMRpcResult(errors);
- listenableFuture.set(rpcResult);
+ settableFuture.set(rpcResult);
return;
}
- RpcResponse rpcReply = (RpcResponse)reply;
- CompositeNode result = XmlUtils.xmlToCompositeNode(rpcReply.getResultCompositeNode());
- listenableFuture.set(RpcResultBuilder.success(result).build());
+ final RpcResponse rpcReply = (RpcResponse)reply;
+ final NormalizedNode<?, ?> result =
+ NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
+ settableFuture.set(new DefaultDOMRpcResult(result));
}
};
- future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global());
- return listenableFuture;
+ future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global());
+ // FIXME find non blocking way for implementation
+ try {
+ return Futures.immediateCheckedFuture(listenableFuture.get());
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.debug("Unexpected remote RPC exception.", e);
+ return Futures.immediateFailedCheckedFuture((DOMRpcException) new DOMRpcImplementationNotAvailableException(e, "Unexpected remote RPC exception"));
+ }
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-
/**
* This is the base class which initialize all the actors, listeners and
* default RPc implementation so remote invocation of rpcs.
private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
- private final RpcProvisionRegistry rpcProvisionRegistry;
+ private final DOMRpcProviderService rpcProvisionRegistry;
+ private ListenerRegistration<SchemaContextListener> schemaListenerRegistration;
private ActorSystem actorSystem;
private Broker.ProviderSession brokerSession;
private SchemaContext schemaContext;
private ActorRef rpcManager;
- private RemoteRpcProviderConfig config;
+ private final RemoteRpcProviderConfig config;
- public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
+ public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry) {
this.actorSystem = actorSystem;
this.rpcProvisionRegistry = rpcProvisionRegistry;
- this.config = new RemoteRpcProviderConfig(actorSystem.settings().config());
+ config = new RemoteRpcProviderConfig(actorSystem.settings().config());
}
@Override
public void close() throws Exception {
- if (this.actorSystem != null)
- this.actorSystem.shutdown();
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ actorSystem = null;
+ }
+ if (schemaListenerRegistration != null) {
+ schemaListenerRegistration.close();
+ schemaListenerRegistration = null;
+ }
}
@Override
- public void onSessionInitiated(Broker.ProviderSession session) {
- this.brokerSession = session;
+ public void onSessionInitiated(final Broker.ProviderSession session) {
+ brokerSession = session;
start();
}
private void start() {
LOG.info("Starting remote rpc service...");
- SchemaService schemaService = brokerSession.getService(SchemaService.class);
+ final SchemaService schemaService = brokerSession.getService(SchemaService.class);
+ final DOMRpcService rpcService = brokerSession.getService(DOMRpcService.class);
schemaContext = schemaService.getGlobalContext();
-
- rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry),
- config.getRpcManagerName());
-
+ rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext,
+ rpcProvisionRegistry, rpcService), config.getRpcManagerName());
+ schemaListenerRegistration = schemaService.registerSchemaContextListener(this);
LOG.debug("rpc manager started");
-
- schemaService.registerSchemaContextListener(this);
}
@Override
- public void onGlobalContextUpdated(SchemaContext schemaContext) {
+ public void onGlobalContextUpdated(final SchemaContext schemaContext) {
this.schemaContext = schemaContext;
rpcManager.tell(new UpdateSchemaContext(schemaContext), null);
-
}
}
import akka.actor.ActorSystem;
import akka.osgi.BundleDelegatingClassLoader;
import com.typesafe.config.Config;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public static RemoteRpcProvider createInstance(
final Broker broker, final BundleContext bundleContext, final RemoteRpcProviderConfig config){
- RemoteRpcProvider rpcProvider =
- new RemoteRpcProvider(createActorSystem(bundleContext, config), (RpcProvisionRegistry) broker);
+ final RemoteRpcProvider rpcProvider =
+ new RemoteRpcProvider(createActorSystem(bundleContext, config), (DOMRpcProviderService) broker);
broker.registerProvider(rpcProvider);
return rpcProvider;
}
- private static ActorSystem createActorSystem(BundleContext bundleContext, RemoteRpcProviderConfig config){
+ private static ActorSystem createActorSystem(final BundleContext bundleContext, final RemoteRpcProviderConfig config){
// Create an OSGi bundle classloader for actor system
- BundleDelegatingClassLoader classLoader =
+ final BundleDelegatingClassLoader classLoader =
new BundleDelegatingClassLoader(bundleContext.getBundle(),
Thread.currentThread().getContextClassLoader());
- Config actorSystemConfig = config.get();
+ final Config actorSystemConfig = config.get();
if(LOG.isDebugEnabled()) {
LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
}
package org.opendaylight.controller.remote.rpc;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-import java.io.Serializable;
-
public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, YangInstanceIdentifier>,Serializable {
private static final long serialVersionUID = 1L;
@Override
public QName getContext() {
- return this.context;
+ return context;
}
@Override
public QName getType() {
- return this.type;
+ return type;
}
@Override
public YangInstanceIdentifier getRoute() {
- return this.route;
+ return route;
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- RouteIdentifierImpl that = (RouteIdentifierImpl) o;
+ final RouteIdentifierImpl that = (RouteIdentifierImpl) o;
if (context == null){
if (that.getContext() != null) return false;
@Override
public int hashCode() {
- int prime = 31;
+ final int prime = 31;
int result = 0;
result = prime * result + (context == null ? 0:context.hashCode());
result = prime * result + (type == null ? 0:type.hashCode());
import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>{
- private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
- private final ActorRef rpcRegistry;
+ private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
+ private final ActorRef rpcRegistry;
- public RoutedRpcListener(ActorRef rpcRegistry) {
- Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null");
-
- this.rpcRegistry = rpcRegistry;
- }
-
- @Override
- public void onRouteChange(RouteChange<RpcRoutingContext, YangInstanceIdentifier> routeChange) {
- Map<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = routeChange.getAnnouncements();
- if(announcements != null && announcements.size() > 0){
- announce(getRouteIdentifiers(announcements));
+ public RoutedRpcListener(final ActorRef rpcRegistry) {
+ Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null");
+ this.rpcRegistry = rpcRegistry;
}
- Map<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = routeChange.getRemovals();
- if(removals != null && removals.size() > 0 ) {
- remove(getRouteIdentifiers(removals));
+ @Override
+ public void onRouteChange(final RouteChange<RpcRoutingContext, YangInstanceIdentifier> routeChange) {
+ final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = routeChange.getAnnouncements();
+ if(announcements != null && announcements.size() > 0){
+ announce(getRouteIdentifiers(announcements));
+ }
+
+ final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = routeChange.getRemovals();
+ if(removals != null && removals.size() > 0 ) {
+ remove(getRouteIdentifiers(removals));
+ }
}
- }
- /**
- *
- * @param announcements
- */
- private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Announcing [{}]", announcements);
+ /**
+ *
+ * @param announcements
+ */
+ private void announce(final Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Announcing [{}]", announcements);
+ }
+ final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg =
+ new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
+ rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
}
- RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
- rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
- }
- /**
- *
- * @param removals
- */
- private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
- if(LOG.isDebugEnabled()) {
- LOG.debug("Removing [{}]", removals);
+ /**
+ *
+ * @param removals
+ */
+ private void remove(final Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Removing [{}]", removals);
+ }
+ final RpcRegistry.Messages.RemoveRoutes removeRpcMsg =
+ new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
+ rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
}
- RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
- rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
- }
- /**
- *
- * @param changes
- * @return
- */
- private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(Map<RpcRoutingContext, Set<YangInstanceIdentifier>> changes) {
- RouteIdentifierImpl routeId = null;
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
+ /**
+ *
+ * @param changes
+ * @return
+ */
+ private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(
+ final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> changes) {
- for (RpcRoutingContext context : changes.keySet()){
- for (YangInstanceIdentifier instanceId : changes.get(context)){
- routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
- routeIdSet.add(routeId);
- }
+ final Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
+ for (final RpcRoutingContext context : changes.keySet()){
+ for (final YangInstanceIdentifier instanceId : changes.get(context)){
+ final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
+ routeIdSet.add(routeId);
+ }
+ }
+ return routeIdSet;
}
- return routeIdSet;
- }
}
package org.opendaylight.controller.remote.rpc;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.japi.Pair;
-
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
-
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Future;
-
-import static akka.pattern.Patterns.ask;
-
/**
* Actor to initiate execution of remote RPC on other nodes of the cluster.
*/
public class RpcBroker extends AbstractUntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
- private final Broker.ProviderSession brokerSession;
private final ActorRef rpcRegistry;
- private SchemaContext schemaContext;
private final RemoteRpcProviderConfig config;
+ private final DOMRpcService rpcService;
- private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
- SchemaContext schemaContext) {
- this.brokerSession = brokerSession;
+ private RpcBroker(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+ this.rpcService = rpcService;
this.rpcRegistry = rpcRegistry;
- this.schemaContext = schemaContext;
config = new RemoteRpcProviderConfig(getContext().system().settings().config());
}
- public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
- SchemaContext schemaContext) {
- return Props.create(new RpcBrokerCreator(brokerSession, rpcRegistry, schemaContext));
+ public static Props props(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+ Preconditions.checkNotNull(rpcRegistry, "ActorRef can not be null!");
+ Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
+ return Props.create(new RpcBrokerCreator(rpcService, rpcRegistry));
}
@Override
- protected void handleReceive(Object message) throws Exception {
+ protected void handleReceive(final Object message) throws Exception {
if(message instanceof InvokeRpc) {
invokeRemoteRpc((InvokeRpc) message);
} else if(message instanceof ExecuteRpc) {
executeRpc((ExecuteRpc) message);
- } else if(message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
}
}
- private void updateSchemaContext(UpdateSchemaContext message) {
- this.schemaContext = message.getSchemaContext();
- }
-
private void invokeRemoteRpc(final InvokeRpc msg) {
if(LOG.isDebugEnabled()) {
LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
}
- RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
+ final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
null, msg.getRpc(), msg.getIdentifier());
- RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
+ final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
- scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
+ final scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
final ActorRef sender = getSender();
final ActorRef self = self();
- OnComplete<Object> onComplete = new OnComplete<Object>() {
+ final OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object reply) throws Throwable {
+ public void onComplete(final Throwable failure, final Object reply) throws Throwable {
if(failure != null) {
LOG.error("FindRouters failed", failure);
sender.tell(new akka.actor.Status.Failure(failure), self);
return;
}
- RpcRegistry.Messages.FindRoutersReply findReply =
+ final RpcRegistry.Messages.FindRoutersReply findReply =
(RpcRegistry.Messages.FindRoutersReply)reply;
- List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
+ final List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
if(actorRefList == null || actorRefList.isEmpty()) {
- String message = String.format(
+ final String message = String.format(
"No remote implementation found for rpc %s", msg.getRpc());
sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
message, Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
protected void finishInvokeRpc(final List<Pair<ActorRef, Long>> actorRefList,
final InvokeRpc msg, final ActorRef sender, final ActorRef self) {
- RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
+ final RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
- ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
- schemaContext), msg.getRpc());
+ final Node serializedNode = NormalizedNodeSerializer.serialize(msg.getInput());
+ final ExecuteRpc executeMsg = new ExecuteRpc(serializedNode, msg.getRpc());
- scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
+ final scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
- OnComplete<Object> onComplete = new OnComplete<Object>() {
+ final OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object reply) throws Throwable {
+ public void onComplete(final Throwable failure, final Object reply) throws Throwable {
if(failure != null) {
LOG.error("ExecuteRpc failed", failure);
sender.tell(new akka.actor.Status.Failure(failure), self);
if(LOG.isDebugEnabled()) {
LOG.debug("Executing rpc {}", msg.getRpc());
}
- Future<RpcResult<CompositeNode>> future = brokerSession.rpc(msg.getRpc(),
- XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(),
- schemaContext));
+ final NormalizedNode<?, ?> input = NormalizedNodeSerializer.deSerialize(msg.getInputNormalizedNode());
+ final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> future = rpcService.invokeRpc(schemaPath, input);
- ListenableFuture<RpcResult<CompositeNode>> listenableFuture =
+ final ListenableFuture<DOMRpcResult> listenableFuture =
JdkFutureAdapters.listenInPoolThread(future);
final ActorRef sender = getSender();
final ActorRef self = self();
- Futures.addCallback(listenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+ Futures.addCallback(listenableFuture, new FutureCallback<DOMRpcResult>() {
@Override
- public void onSuccess(RpcResult<CompositeNode> result) {
- if(result.isSuccessful()) {
- sender.tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result.getResult(),
- schemaContext)), self);
- } else {
- String message = String.format("Execution of RPC %s failed", msg.getRpc());
+ public void onSuccess(final DOMRpcResult result) {
+ if (result.getErrors() != null && ( ! result.getErrors().isEmpty())) {
+ final String message = String.format("Execution of RPC %s failed", msg.getRpc());
Collection<RpcError> errors = result.getErrors();
if(errors == null || errors.size() == 0) {
errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
message, errors)), self);
+ } else {
+ final Node serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
+ sender.tell(new RpcResponse(serializedResultNode), self);
}
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(final Throwable t) {
LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
sender.tell(new akka.actor.Status.Failure(t), self);
}
private static class RpcBrokerCreator implements Creator<RpcBroker> {
private static final long serialVersionUID = 1L;
- final Broker.ProviderSession brokerSession;
+ final DOMRpcService rpcService;
final ActorRef rpcRegistry;
- final SchemaContext schemaContext;
- RpcBrokerCreator(ProviderSession brokerSession, ActorRef rpcRegistry,
- SchemaContext schemaContext) {
- this.brokerSession = brokerSession;
+ RpcBrokerCreator(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+ this.rpcService = rpcService;
this.rpcRegistry = rpcRegistry;
- this.schemaContext = schemaContext;
}
@Override
public RpcBroker create() throws Exception {
- return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
+ return new RpcBroker(rpcService, rpcRegistry);
}
}
}
import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.yangtools.yang.common.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-
-public class RpcListener implements RpcRegistrationListener{
+public class RpcListener implements DOMRpcAvailabilityListener{
private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
private final ActorRef rpcRegistry;
- public RpcListener(ActorRef rpcRegistry) {
+ public RpcListener(final ActorRef rpcRegistry) {
this.rpcRegistry = rpcRegistry;
}
- @Override
- public void onRpcImplementationAdded(QName rpc) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding registration for [{}]", rpc);
+ @Override
+ public void onRpcAvailable(@Nonnull final Collection<DOMRpcIdentifier> rpcs) {
+ Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding registration for [{}]", rpcs);
+ }
+ final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
+
+ for (final DOMRpcIdentifier rpc : rpcs) {
+ final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null);
+ routeIds.add(routeId);
+ }
+ final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds);
+ rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
}
- RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
- List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
- routeIds.add(routeId);
- RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds);
- rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
- }
- @Override
- public void onRpcImplementationRemoved(QName rpc) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Removing registration for [{}]", rpc);
+ @Override
+ public void onRpcUnavailable(@Nonnull final Collection<DOMRpcIdentifier> rpcs) {
+ Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Removing registration for [{}]", rpcs);
+ }
+ final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
+ for (final DOMRpcIdentifier rpc : rpcs) {
+ final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null);
+ routeIds.add(routeId);
+ }
+ final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds);
+ rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
}
- RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
- List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
- routeIds.add(routeId);
- RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds);
- rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
- }
}
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.Function;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
-import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcManager extends AbstractUntypedActor {
- private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
-
- private SchemaContext schemaContext;
- private ActorRef rpcBroker;
- private ActorRef rpcRegistry;
- private final Broker.ProviderSession brokerSession;
- private final RemoteRpcProviderConfig config;
- private RpcListener rpcListener;
- private RoutedRpcListener routeChangeListener;
- private RemoteRpcImplementation rpcImplementation;
- private final RpcProvisionRegistry rpcProvisionRegistry;
-
- private RpcManager(SchemaContext schemaContext,
- Broker.ProviderSession brokerSession,
- RpcProvisionRegistry rpcProvisionRegistry) {
- this.schemaContext = schemaContext;
- this.brokerSession = brokerSession;
- this.rpcProvisionRegistry = rpcProvisionRegistry;
- this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
-
- createRpcActors();
- startListeners();
- }
-
-
- public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession,
- final RpcProvisionRegistry rpcProvisionRegistry) {
- return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry);
+ private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
+
+ private SchemaContext schemaContext;
+ private ActorRef rpcBroker;
+ private ActorRef rpcRegistry;
+ private final RemoteRpcProviderConfig config;
+ private RpcListener rpcListener;
+ private RoutedRpcListener routeChangeListener;
+ private RemoteRpcImplementation rpcImplementation;
+ private final DOMRpcProviderService rpcProvisionRegistry;
+ private final DOMRpcService rpcServices;
+
+ private RpcManager(final SchemaContext schemaContext,
+ final DOMRpcProviderService rpcProvisionRegistry,
+ final DOMRpcService rpcSevices) {
+ this.schemaContext = schemaContext;
+ this.rpcProvisionRegistry = rpcProvisionRegistry;
+ rpcServices = rpcSevices;
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+
+ createRpcActors();
+ startListeners();
}
- private void createRpcActors() {
- LOG.debug("Create rpc registry and broker actors");
-
- rpcRegistry =
- getContext().actorOf(Props.create(RpcRegistry.class).
- withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-
- rpcBroker =
- getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
- withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
-
- RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
- rpcRegistry.tell(localRouter, self());
- }
-
- private void startListeners() {
- LOG.debug("Registers rpc listeners");
-
- rpcListener = new RpcListener(rpcRegistry);
- routeChangeListener = new RoutedRpcListener(rpcRegistry);
- rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
-
- brokerSession.addRpcRegistrationListener(rpcListener);
- rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
- rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
- announceSupportedRpcs();
- }
-
- /**
- * Add all the locally registered RPCs in the clustered routing table
- */
- private void announceSupportedRpcs(){
- LOG.debug("Adding all supported rpcs to routing table");
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- rpcListener.onRpcImplementationAdded(rpc);
+
+ public static Props props(final SchemaContext schemaContext,
+ final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) {
+ Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
+ Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
+ Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
+ return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices);
+ }
+
+ private void createRpcActors() {
+ LOG.debug("Create rpc registry and broker actors");
+
+ rpcRegistry =
+ getContext().actorOf(Props.create(RpcRegistry.class).
+ withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+
+ rpcBroker =
+ getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry).
+ withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+
+ final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
+ rpcRegistry.tell(localRouter, self());
}
- }
+ private void startListeners() {
+ LOG.debug("Registers rpc listeners");
+
+ rpcListener = new RpcListener(rpcRegistry);
+ routeChangeListener = new RoutedRpcListener(rpcRegistry);
+ rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
+
+ rpcServices.registerRpcListener(rpcListener);
+
+// rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
+// rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
+ announceSupportedRpcs();
+ }
- @Override
- protected void handleReceive(Object message) throws Exception {
- if(message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
+ /**
+ * Add all the locally registered RPCs in the clustered routing table
+ */
+ private void announceSupportedRpcs(){
+ LOG.debug("Adding all supported rpcs to routing table");
+ final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
+ final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
+ for (final RpcDefinition rpcDef : currentlySupportedRpc) {
+ rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
+ }
+ rpcListener.onRpcAvailable(rpcs);
}
- }
- private void updateSchemaContext(UpdateSchemaContext message) {
- this.schemaContext = message.getSchemaContext();
- rpcBroker.tell(message, ActorRef.noSender());
- }
+ @Override
+ protected void handleReceive(final Object message) throws Exception {
+ if(message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ }
+
+ }
+
+ private void updateSchemaContext(final UpdateSchemaContext message) {
+ schemaContext = message.getSchemaContext();
+ rpcBroker.tell(message, ActorRef.noSender());
+ }
- @Override
- public SupervisorStrategy supervisorStrategy() {
- return new OneForOneStrategy(10, Duration.create("1 minute"),
- new Function<Throwable, SupervisorStrategy.Directive>() {
- @Override
- public SupervisorStrategy.Directive apply(Throwable t) {
- return SupervisorStrategy.resume();
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function<Throwable, SupervisorStrategy.Directive>() {
+ @Override
+ public SupervisorStrategy.Directive apply(final Throwable t) {
+ return SupervisorStrategy.resume();
+ }
}
- }
- );
- }
+ );
+ }
}
import com.google.common.base.Preconditions;
import java.io.Serializable;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.yangtools.yang.common.QName;
public class ExecuteRpc implements Serializable {
private static final long serialVersionUID = 1128904894827335676L;
- private final String inputCompositeNode;
+ private final NormalizedNodeMessages.Node inputNormalizedNode;
private final QName rpc;
- public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
- Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
+ public ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) {
+ Preconditions.checkNotNull(inputNormalizedNode, "Normalized Node input string should be present");
Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
- this.inputCompositeNode = inputCompositeNode;
+ this.inputNormalizedNode = inputNormalizedNode;
this.rpc = rpc;
}
- public String getInputCompositeNode() {
- return inputCompositeNode;
+ public NormalizedNodeMessages.Node getInputNormalizedNode() {
+ return inputNormalizedNode;
}
public QName getRpc() {
import com.google.common.base.Preconditions;
import java.io.Serializable;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class InvokeRpc implements Serializable {
private static final long serialVersionUID = -2813459607858108953L;
private final QName rpc;
private final YangInstanceIdentifier identifier;
- private final CompositeNode input;
+ private final NormalizedNode<?,?> input;
- public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
+ public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final NormalizedNode<?,?> input) {
Preconditions.checkNotNull(rpc, "rpc qname should not be null");
Preconditions.checkNotNull(input, "rpc input should not be null");
return identifier;
}
- public CompositeNode getInput() {
+ public NormalizedNode<?,?> getInput() {
return input;
}
}
package org.opendaylight.controller.remote.rpc.messages;
import java.io.Serializable;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
public class RpcResponse implements Serializable {
private static final long serialVersionUID = -4211279498688989245L;
- private final String resultCompositeNode;
+ private final NormalizedNodeMessages.Node resultNormalizedNode;
- public RpcResponse(final String resultCompositeNode) {
- this.resultCompositeNode = resultCompositeNode;
+ public RpcResponse(final NormalizedNodeMessages.Node inputNormalizedNode) {
+ resultNormalizedNode = inputNormalizedNode;
}
- public String getResultCompositeNode() {
- return resultCompositeNode;
+ public NormalizedNodeMessages.Node getResultNormalizedNode() {
+ return resultNormalizedNode;
}
}
package org.opendaylight.controller.remote.rpc;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
-import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.mockito.Mockito;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import java.io.File;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
/**
* Base class for RPC tests.
*
protected JavaTestKit probeReg2;
protected Broker.ProviderSession brokerSession;
protected SchemaContext schemaContext;
+ protected DOMRpcService rpcService;
@BeforeClass
public static void setup() throws InterruptedException {
- RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
- RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+ final RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+ final RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
node1 = ActorSystem.create("opendaylight-rpc", config1.get());
node2 = ActorSystem.create("opendaylight-rpc", config2.get());
}
new File(RpcBrokerTest.class.getResource("/test-rpc.yang").getPath())));
brokerSession = Mockito.mock(Broker.ProviderSession.class);
+ rpcService = Mockito.mock(DOMRpcService.class);
+
probeReg1 = new JavaTestKit(node1);
- rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext));
+ rpcBroker1 = node1.actorOf(RpcBroker.props(rpcService, probeReg1.getRef()));
probeReg2 = new JavaTestKit(node2);
- rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext));
+ rpcBroker2 = node2.actorOf(RpcBroker.props(rpcService, probeReg2.getRef()));
}
- static void assertRpcErrorEquals(RpcError rpcError, ErrorSeverity severity,
- ErrorType errorType, String tag, String message, String applicationTag, String info,
- String causeMsg) {
+ static void assertRpcErrorEquals(final RpcError rpcError, final ErrorSeverity severity,
+ final ErrorType errorType, final String tag, final String message, final String applicationTag, final String info,
+ final String causeMsg) {
assertEquals("getSeverity", severity, rpcError.getSeverity());
assertEquals("getErrorType", errorType, rpcError.getErrorType());
assertEquals("getTag", tag, rpcError.getTag());
}
}
- static void assertCompositeNodeEquals(CompositeNode exp, CompositeNode actual) {
- assertEquals("NodeType getNamespace", exp.getNodeType().getNamespace(),
- actual.getNodeType().getNamespace());
- assertEquals("NodeType getLocalName", exp.getNodeType().getLocalName(),
- actual.getNodeType().getLocalName());
- for(Node<?> child: exp.getValue()) {
- List<Node<?>> c = actual.get(child.getNodeType());
- assertNotNull("Missing expected child " + child.getNodeType(), c);
- if(child instanceof CompositeNode) {
- assertCompositeNodeEquals((CompositeNode) child, (CompositeNode)c.get(0));
- } else {
- assertEquals("Value for Node " + child.getNodeType(), child.getValue(),
- c.get(0).getValue());
- }
- }
- }
-
- static CompositeNode makeRPCInput(String data) {
- CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder()
- .setQName(TEST_RPC_INPUT).addLeaf(TEST_RPC_INPUT_DATA, data);
- return ImmutableCompositeNode.create(
- TEST_RPC, ImmutableList.<Node<?>>of(builder.toInstance()));
- }
-
- static CompositeNode makeRPCOutput(String data) {
- CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder()
- .setQName(TEST_RPC_OUTPUT).addLeaf(TEST_RPC_OUTPUT_DATA, data);
- return ImmutableCompositeNode.create(
- TEST_RPC, ImmutableList.<Node<?>>of(builder.toInstance()));
- }
-
- static void assertFailedRpcResult(RpcResult<CompositeNode> rpcResult, ErrorSeverity severity,
- ErrorType errorType, String tag, String message, String applicationTag, String info,
- String causeMsg) {
-
- assertNotNull("RpcResult was null", rpcResult);
- assertEquals("isSuccessful", false, rpcResult.isSuccessful());
- Collection<RpcError> rpcErrors = rpcResult.getErrors();
- assertEquals("RpcErrors count", 1, rpcErrors.size());
- assertRpcErrorEquals(rpcErrors.iterator().next(), severity, errorType, tag, message,
- applicationTag, info, causeMsg);
- }
-
- static void assertSuccessfulRpcResult(RpcResult<CompositeNode> rpcResult,
- CompositeNode expOutput) {
-
- assertNotNull("RpcResult was null", rpcResult);
- assertEquals("isSuccessful", true, rpcResult.isSuccessful());
- assertCompositeNodeEquals(expOutput, rpcResult.getResult());
- }
-
static class TestException extends Exception {
private static final long serialVersionUID = 1L;
package org.opendaylight.controller.remote.rpc;
-import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
-import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.xml.codec.XmlUtils;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
/***
* Unit tests for RemoteRpcImplementation.
*/
public class RemoteRpcImplementationTest extends AbstractRpcTest {
- @Test
- public void testInvokeRpc() throws Exception {
- final AtomicReference<AssertionError> assertError = new AtomicReference<>();
- try {
- RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext, getConfig());
-
- final CompositeNode input = makeRPCInput("foo");
- final CompositeNode output = makeRPCOutput("bar");
- final AtomicReference<InvokeRpc> invokeRpcMsg = setupInvokeRpcReply(assertError, output);
-
- ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
-
- RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
-
- assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0));
-
- assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc());
- assertEquals("getInput", input, invokeRpcMsg.get().getInput());
- } finally {
- if(assertError.get() != null) {
- throw assertError.get();
- }
- }
- }
-
- @Test
- public void testInvokeRpcWithIdentifier() throws Exception {
- final AtomicReference<AssertionError> assertError = new AtomicReference<>();
- try {
- RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext, getConfig());
-
- QName instanceQName = new QName(new URI("ns"), "instance");
- YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName);
-
- CompositeNode input = makeRPCInput("foo");
- CompositeNode output = makeRPCOutput("bar");
- final AtomicReference<InvokeRpc> invokeRpcMsg = setupInvokeRpcReply(assertError, output);
-
- ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(
- TEST_RPC, identifier, input);
-
- RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
-
- assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0));
-
- assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc());
- assertEquals("getInput", input, invokeRpcMsg.get().getInput());
- assertEquals("getRoute", identifier, invokeRpcMsg.get().getIdentifier());
- } finally {
- if(assertError.get() != null) {
- throw assertError.get();
- }
- }
- }
-
- @Test
- public void testInvokeRpcWithRpcErrorsException() throws Exception {
- final AtomicReference<AssertionError> assertError = new AtomicReference<>();
- try {
- RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext, getConfig());
-
- final CompositeNode input = makeRPCInput("foo");
-
- setupInvokeRpcErrorReply(assertError, new RpcErrorsException(
- "mock", Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, "tag",
- "error", "appTag", "info", null))));
-
- ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
-
- RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
-
- assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "tag",
- "error", "appTag", "info", null);
- } finally {
- if(assertError.get() != null) {
- throw assertError.get();
- }
- }
- }
-
- @Test
- public void testInvokeRpcWithOtherException() throws Exception {
- final AtomicReference<AssertionError> assertError = new AtomicReference<>();
- try {
- RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext, getConfig());
-
- final CompositeNode input = makeRPCInput("foo");
-
- setupInvokeRpcErrorReply(assertError, new TestException());
-
- ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
-
- RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
-
- assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "operation-failed",
- TestException.MESSAGE, null, null, TestException.MESSAGE);
- } finally {
- if(assertError.get() != null) {
- throw assertError.get();
- }
- }
- }
-
- private AtomicReference<InvokeRpc> setupInvokeRpcReply(
- final AtomicReference<AssertionError> assertError, final CompositeNode output) {
- return setupInvokeRpcReply(assertError, output, null);
- }
-
- private AtomicReference<InvokeRpc> setupInvokeRpcErrorReply(
- final AtomicReference<AssertionError> assertError, final Exception error) {
- return setupInvokeRpcReply(assertError, null, error);
- }
-
- private AtomicReference<InvokeRpc> setupInvokeRpcReply(
- final AtomicReference<AssertionError> assertError, final CompositeNode output,
- final Exception error) {
- final AtomicReference<InvokeRpc> invokeRpcMsg = new AtomicReference<>();
-
- new Thread() {
- @Override
- public void run() {
- try {
- invokeRpcMsg.set(probeReg1.expectMsgClass(
- JavaTestKit.duration("5 seconds"), InvokeRpc.class));
-
- if(output != null) {
- probeReg1.reply(new RpcResponse(XmlUtils.outputCompositeNodeToXml(
- output, schemaContext)));
- } else {
- probeReg1.reply(new akka.actor.Status.Failure(error));
- }
-
- } catch(AssertionError e) {
- assertError.set(e);
- }
- }
-
- }.start();
-
- return invokeRpcMsg;
- }
private RemoteRpcProviderConfig getConfig(){
return new RemoteRpcProviderConfig.Builder("unit-test").build();
package org.opendaylight.controller.remote.rpc;
-import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import com.typesafe.config.Config;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Test;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
-import org.opendaylight.controller.sal.core.api.model.SchemaService;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.Await;
-import scala.concurrent.duration.Duration;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class RemoteRpcProviderTest {
@BeforeClass
public static void setup() throws InterruptedException {
moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
- Config config = moduleConfig.get();
+ final Config config = moduleConfig.get();
system = ActorSystem.create("odl-cluster-rpc", config);
}
system = null;
}
- @Test
- public void testRemoteRpcProvider() throws Exception {
- RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(RpcProvisionRegistry.class));
- Broker.ProviderSession session = mock(Broker.ProviderSession.class);
- SchemaService schemaService = mock(SchemaService.class);
- when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class));
- when(session.getService(SchemaService.class)).thenReturn(schemaService);
-
- rpcProvider.onSessionInitiated(session);
-
- ActorRef actorRef = Await.result(
- system.actorSelection(
- moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
- Duration.create(2, TimeUnit.SECONDS));
-
- Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
- }
}
package org.opendaylight.controller.remote.rpc;
-import akka.actor.ActorRef;
-import akka.japi.Pair;
-import akka.testkit.JavaTestKit;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import static org.junit.Assert.assertEquals;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
-import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import org.opendaylight.controller.xml.codec.XmlUtils;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.any;
public class RpcBrokerTest extends AbstractRpcTest {
- @Test
- public void testInvokeRpcWithNoRemoteActor() throws Exception {
- new JavaTestKit(node1) {{
- CompositeNode input = makeRPCInput("foo");
-
- InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, input);
- rpcBroker1.tell(invokeMsg, getRef());
-
- probeReg1.expectMsgClass(duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
- probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
- Collections.<Pair<ActorRef, Long>>emptyList()));
-
- akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
- akka.actor.Status.Failure.class);
-
- assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
- }};
- }
-
-
- /**
- * This test method invokes and executes the remote rpc
- */
- //@Test
- public void testInvokeRpc() throws URISyntaxException {
- new JavaTestKit(node1) {{
- QName instanceQName = new QName(new URI("ns"), "instance");
-
- CompositeNode invokeRpcResult = makeRPCOutput("bar");
- RpcResult<CompositeNode> rpcResult =
- RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
- ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
- when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
- .thenReturn(Futures.immediateFuture(rpcResult));
-
- // invoke rpc
- CompositeNode input = makeRPCInput("foo");
- YangInstanceIdentifier instanceID = YangInstanceIdentifier.of(instanceQName);
- InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, instanceID, input);
- rpcBroker1.tell(invokeMsg, getRef());
-
- FindRouters findRouters = probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
- assertEquals("getType", TEST_RPC, routeIdentifier.getType());
- assertEquals("getRoute", instanceID, routeIdentifier.getRoute());
-
- probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
- Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
-
- RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
- assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
- XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
- assertCompositeNodeEquals(input, inputCaptor.getValue());
- }};
- }
-
- @Test
- public void testInvokeRpcWithNoOutput() {
- new JavaTestKit(node1) {{
-
- RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>success().build();
- when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
- .thenReturn(Futures.immediateFuture(rpcResult));
-
- InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
- rpcBroker1.tell(invokeMsg, getRef());
-
- probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
- Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
-
- RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
-
- assertEquals("getResultCompositeNode", "", rpcResponse.getResultCompositeNode());
- }};
- }
-
- @Test
- public void testInvokeRpcWithExecuteFailure() {
- new JavaTestKit(node1) {{
-
- RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
- .withError(ErrorType.RPC, "tag", "error", "appTag", "info",
- new Exception("mock"))
- .build();
- when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
- .thenReturn(Futures.immediateFuture(rpcResult));
-
- InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
- rpcBroker1.tell(invokeMsg, getRef());
-
- probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
- Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
-
- akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
- akka.actor.Status.Failure.class);
-
- assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
-
- RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
- List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
- assertEquals("RpcErrors count", 1, rpcErrors.size());
- assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag",
- "error", "appTag", "info", "mock");
- }};
- }
-
- @Test
- public void testInvokeRpcWithFindRoutersFailure() {
- new JavaTestKit(node1) {{
-
- InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
- rpcBroker1.tell(invokeMsg, getRef());
-
- probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- probeReg1.reply(new akka.actor.Status.Failure(new TestException()));
-
- akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
- akka.actor.Status.Failure.class);
-
- assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
- }};
- }
-
- @Test
- public void testExecuteRpc() {
- new JavaTestKit(node1) {{
-
- String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
-
- CompositeNode invokeRpcResult = makeRPCOutput("bar");
- RpcResult<CompositeNode> rpcResult =
- RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
- ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
- when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
- .thenReturn(Futures.immediateFuture(rpcResult));
-
- ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
-
- rpcBroker1.tell(executeMsg, getRef());
-
- RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
-
- assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
- XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
- }};
- }
-
- @Test
- public void testExecuteRpcFailureWithRpcErrors() {
- new JavaTestKit(node1) {{
-
- String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
-
- RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
- .withError(ErrorType.RPC, "tag1", "error", "appTag1", "info1",
- new Exception("mock"))
- .withWarning(ErrorType.PROTOCOL, "tag2", "warning", "appTag2", "info2", null)
- .build();
- when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
- .thenReturn(Futures.immediateFuture(rpcResult));
-
- ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
-
- rpcBroker1.tell(executeMsg, getRef());
-
- akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
- akka.actor.Status.Failure.class);
-
- assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
-
- RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
- List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
- assertEquals("RpcErrors count", 2, rpcErrors.size());
- assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag1",
- "error", "appTag1", "info1", "mock");
- assertRpcErrorEquals(rpcErrors.get(1), ErrorSeverity.WARNING, ErrorType.PROTOCOL, "tag2",
- "warning", "appTag2", "info2", null);
- }};
- }
-
- @Test
- public void testExecuteRpcFailureWithNoRpcErrors() {
- new JavaTestKit(node1) {{
-
- String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
-
- RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed().build();
- when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
- .thenReturn(Futures.immediateFuture(rpcResult));
-
- ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
-
- rpcBroker1.tell(executeMsg, getRef());
-
- akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
- akka.actor.Status.Failure.class);
-
- assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
-
- RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
- List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
- assertEquals("RpcErrors count", 1, rpcErrors.size());
- assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC,
- "operation-failed", "failed", null, null, null);
- }};
- }
-
- @Test
- public void testExecuteRpcFailureWithException() {
- new JavaTestKit(node1) {{
-
- String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
-
- when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
- .thenReturn(Futures.<RpcResult<CompositeNode>>immediateFailedFuture(
- new TestException()));
-
- ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
-
- rpcBroker1.tell(executeMsg, getRef());
-
- akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
- akka.actor.Status.Failure.class);
- assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
- }};
- }
}
package org.opendaylight.controller.remote.rpc;
-import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import com.typesafe.config.ConfigFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.yangtools.yang.common.QName;
-
-import java.net.URI;
-import java.net.URISyntaxException;
public class RpcListenerTest {
system = null;
}
- @Test
- public void testRpcAdd() throws URISyntaxException {
- new JavaTestKit(system) {
- {
- JavaTestKit probeReg = new JavaTestKit(system);
- ActorRef rpcRegistry = probeReg.getRef();
-
- RpcListener rpcListener = new RpcListener(rpcRegistry);
-
- QName qName = new QName(new URI("actor2"), "actor2");
-
- rpcListener.onRpcImplementationAdded(qName);
- probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class);
- }};
-
- }
-
- @Test
- public void testRpcRemove() throws URISyntaxException {
- new JavaTestKit(system) {
- {
- JavaTestKit probeReg = new JavaTestKit(system);
- ActorRef rpcRegistry = probeReg.getRef();
-
- RpcListener rpcListener = new RpcListener(rpcRegistry);
-
- QName qName = new QName(new URI("actor2"), "actor2");
-
- rpcListener.onRpcImplementationRemoved(qName);
- probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class);
- }};
-
- }
}