From 86024f5f5c072e38b51e8367226ad166e3f42d0f Mon Sep 17 00:00:00 2001 From: Jan Hajnar Date: Wed, 18 Mar 2015 17:37:56 +0100 Subject: [PATCH] BUG 2412 - remove CompositeNode from sal-remoterpc-connector * akka remote RPC distribution needs move to new DOM API * remove all tests which are use CompositeNode. Test has to be rewriten with NormalizedNodes ASAP. Change-Id: I8863f5c9657992d23be67acb2f542b0fda81db8c Signed-off-by: Vaclav Demcak Signed-off-by: Jan Hajnar --- .../remote/rpc/RemoteRpcImplementation.java | 98 +++---- .../remote/rpc/RemoteRpcProvider.java | 45 +-- .../remote/rpc/RemoteRpcProviderFactory.java | 12 +- .../remote/rpc/RouteIdentifierImpl.java | 16 +- .../remote/rpc/RoutedRpcListener.java | 114 ++++---- .../controller/remote/rpc/RpcBroker.java | 117 ++++---- .../controller/remote/rpc/RpcListener.java | 63 +++-- .../controller/remote/rpc/RpcManager.java | 184 ++++++------ .../remote/rpc/messages/ExecuteRpc.java | 13 +- .../remote/rpc/messages/InvokeRpc.java | 8 +- .../remote/rpc/messages/RpcResponse.java | 11 +- .../remote/rpc/AbstractRpcTest.java | 92 ++---- .../rpc/RemoteRpcImplementationTest.java | 165 ----------- .../remote/rpc/RemoteRpcProviderTest.java | 33 +-- .../controller/remote/rpc/RpcBrokerTest.java | 261 ------------------ .../remote/rpc/RpcListenerTest.java | 40 --- 16 files changed, 360 insertions(+), 912 deletions(-) diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index 0f84abb22e..360ac68a51 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -1,95 +1,87 @@ package org.opendaylight.controller.remote.rpc; +import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; import akka.dispatch.OnComplete; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; -import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.controller.xml.codec.XmlUtils; -import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; -import java.util.Collections; -import java.util.Set; - -import static akka.pattern.Patterns.ask; - -public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation { +public class RemoteRpcImplementation implements DOMRpcImplementation { private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class); private final ActorRef rpcBroker; - private final SchemaContext schemaContext; private final RemoteRpcProviderConfig config; - public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext, RemoteRpcProviderConfig config) { + public RemoteRpcImplementation(final ActorRef rpcBroker, final RemoteRpcProviderConfig config) { this.rpcBroker = rpcBroker; - this.schemaContext = schemaContext; this.config = config; } @Override - public ListenableFuture> invokeRpc(QName rpc, - YangInstanceIdentifier identifier, CompositeNode input) { - InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input); - - return executeMsg(rpcMsg); - } - - @Override - public Set getSupportedRpcs() { - // TODO : check if we need to get this from routing registry - return Collections.emptySet(); - } + public CheckedFuture invokeRpc(final DOMRpcIdentifier rpc, final NormalizedNode input) { + final InvokeRpc rpcMsg = new InvokeRpc(rpc.getType().getLastComponent(), rpc.getContextReference(), input); - @Override - public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { - InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input); - return executeMsg(rpcMsg); - } + final SettableFuture settableFuture = SettableFuture.create(); - private ListenableFuture> executeMsg(InvokeRpc rpcMsg) { + final ListenableFuture listenableFuture = + JdkFutureAdapters.listenInPoolThread(settableFuture); - final SettableFuture> listenableFuture = SettableFuture.create(); + final scala.concurrent.Future future = ask(rpcBroker, rpcMsg, config.getAskDuration()); - scala.concurrent.Future future = ask(rpcBroker, rpcMsg, config.getAskDuration()); - - OnComplete onComplete = new OnComplete() { + final OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Object reply) throws Throwable { + public void onComplete(final Throwable failure, final Object reply) throws Throwable { if(failure != null) { LOG.error("InvokeRpc failed", failure); - RpcResult rpcResult; - if(failure instanceof RpcErrorsException) { - rpcResult = RpcResultBuilder.failed().withRpcErrors( - ((RpcErrorsException)failure).getRpcErrors()).build(); - } else { - rpcResult = RpcResultBuilder.failed().withError( - ErrorType.RPC, failure.getMessage(), failure).build(); + final String message = String.format("Execution of RPC %s failed", rpcMsg.getRpc()); + Collection errors = ((RpcErrorsException)failure).getRpcErrors(); + if(errors == null || errors.size() == 0) { + errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, null, message)); } + final DOMRpcResult rpcResult = new DefaultDOMRpcResult(errors); - listenableFuture.set(rpcResult); + settableFuture.set(rpcResult); return; } - RpcResponse rpcReply = (RpcResponse)reply; - CompositeNode result = XmlUtils.xmlToCompositeNode(rpcReply.getResultCompositeNode()); - listenableFuture.set(RpcResultBuilder.success(result).build()); + final RpcResponse rpcReply = (RpcResponse)reply; + final NormalizedNode result = + NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode()); + settableFuture.set(new DefaultDOMRpcResult(result)); } }; - future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global()); - return listenableFuture; + future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global()); + // FIXME find non blocking way for implementation + try { + return Futures.immediateCheckedFuture(listenableFuture.get()); + } + catch (InterruptedException | ExecutionException e) { + LOG.debug("Unexpected remote RPC exception.", e); + return Futures.immediateFailedCheckedFuture((DOMRpcException) new DOMRpcImplementationNotAvailableException(e, "Unexpected remote RPC exception")); + } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java index d24ed5651a..a1b6286a59 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java @@ -11,18 +11,19 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import java.util.Collection; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.Provider; -import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.opendaylight.controller.sal.core.api.model.SchemaService; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; - /** * This is the base class which initialize all the actors, listeners and * default RPc implementation so remote invocation of rpcs. @@ -31,30 +32,37 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class); - private final RpcProvisionRegistry rpcProvisionRegistry; + private final DOMRpcProviderService rpcProvisionRegistry; + private ListenerRegistration schemaListenerRegistration; private ActorSystem actorSystem; private Broker.ProviderSession brokerSession; private SchemaContext schemaContext; private ActorRef rpcManager; - private RemoteRpcProviderConfig config; + private final RemoteRpcProviderConfig config; - public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) { + public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry) { this.actorSystem = actorSystem; this.rpcProvisionRegistry = rpcProvisionRegistry; - this.config = new RemoteRpcProviderConfig(actorSystem.settings().config()); + config = new RemoteRpcProviderConfig(actorSystem.settings().config()); } @Override public void close() throws Exception { - if (this.actorSystem != null) - this.actorSystem.shutdown(); + if (actorSystem != null) { + actorSystem.shutdown(); + actorSystem = null; + } + if (schemaListenerRegistration != null) { + schemaListenerRegistration.close(); + schemaListenerRegistration = null; + } } @Override - public void onSessionInitiated(Broker.ProviderSession session) { - this.brokerSession = session; + public void onSessionInitiated(final Broker.ProviderSession session) { + brokerSession = session; start(); } @@ -66,21 +74,18 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext private void start() { LOG.info("Starting remote rpc service..."); - SchemaService schemaService = brokerSession.getService(SchemaService.class); + final SchemaService schemaService = brokerSession.getService(SchemaService.class); + final DOMRpcService rpcService = brokerSession.getService(DOMRpcService.class); schemaContext = schemaService.getGlobalContext(); - - rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry), - config.getRpcManagerName()); - + rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, + rpcProvisionRegistry, rpcService), config.getRpcManagerName()); + schemaListenerRegistration = schemaService.registerSchemaContextListener(this); LOG.debug("rpc manager started"); - - schemaService.registerSchemaContextListener(this); } @Override - public void onGlobalContextUpdated(SchemaContext schemaContext) { + public void onGlobalContextUpdated(final SchemaContext schemaContext) { this.schemaContext = schemaContext; rpcManager.tell(new UpdateSchemaContext(schemaContext), null); - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java index c82a72eaa5..d17242ed60 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java @@ -11,8 +11,8 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorSystem; import akka.osgi.BundleDelegatingClassLoader; import com.typesafe.config.Config; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.osgi.framework.BundleContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,21 +23,21 @@ public class RemoteRpcProviderFactory { public static RemoteRpcProvider createInstance( final Broker broker, final BundleContext bundleContext, final RemoteRpcProviderConfig config){ - RemoteRpcProvider rpcProvider = - new RemoteRpcProvider(createActorSystem(bundleContext, config), (RpcProvisionRegistry) broker); + final RemoteRpcProvider rpcProvider = + new RemoteRpcProvider(createActorSystem(bundleContext, config), (DOMRpcProviderService) broker); broker.registerProvider(rpcProvider); return rpcProvider; } - private static ActorSystem createActorSystem(BundleContext bundleContext, RemoteRpcProviderConfig config){ + private static ActorSystem createActorSystem(final BundleContext bundleContext, final RemoteRpcProviderConfig config){ // Create an OSGi bundle classloader for actor system - BundleDelegatingClassLoader classLoader = + final BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(), Thread.currentThread().getContextClassLoader()); - Config actorSystemConfig = config.get(); + final Config actorSystemConfig = config.get(); if(LOG.isDebugEnabled()) { LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java index 85d21381b6..99e812193c 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java @@ -8,13 +8,11 @@ package org.opendaylight.controller.remote.rpc; import com.google.common.base.Preconditions; +import java.io.Serializable; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; - -import java.io.Serializable; - public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier,Serializable { private static final long serialVersionUID = 1L; @@ -31,26 +29,26 @@ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier{ - private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class); - private final ActorRef rpcRegistry; + private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class); + private final ActorRef rpcRegistry; - public RoutedRpcListener(ActorRef rpcRegistry) { - Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null"); - - this.rpcRegistry = rpcRegistry; - } - - @Override - public void onRouteChange(RouteChange routeChange) { - Map> announcements = routeChange.getAnnouncements(); - if(announcements != null && announcements.size() > 0){ - announce(getRouteIdentifiers(announcements)); + public RoutedRpcListener(final ActorRef rpcRegistry) { + Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null"); + this.rpcRegistry = rpcRegistry; } - Map> removals = routeChange.getRemovals(); - if(removals != null && removals.size() > 0 ) { - remove(getRouteIdentifiers(removals)); + @Override + public void onRouteChange(final RouteChange routeChange) { + final Map> announcements = routeChange.getAnnouncements(); + if(announcements != null && announcements.size() > 0){ + announce(getRouteIdentifiers(announcements)); + } + + final Map> removals = routeChange.getRemovals(); + if(removals != null && removals.size() > 0 ) { + remove(getRouteIdentifiers(removals)); + } } - } - /** - * - * @param announcements - */ - private void announce(Set> announcements) { - if(LOG.isDebugEnabled()) { - LOG.debug("Announcing [{}]", announcements); + /** + * + * @param announcements + */ + private void announce(final Set> announcements) { + if(LOG.isDebugEnabled()) { + LOG.debug("Announcing [{}]", announcements); + } + final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = + new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements)); + rpcRegistry.tell(addRpcMsg, ActorRef.noSender()); } - RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements)); - rpcRegistry.tell(addRpcMsg, ActorRef.noSender()); - } - /** - * - * @param removals - */ - private void remove(Set> removals){ - if(LOG.isDebugEnabled()) { - LOG.debug("Removing [{}]", removals); + /** + * + * @param removals + */ + private void remove(final Set> removals){ + if(LOG.isDebugEnabled()) { + LOG.debug("Removing [{}]", removals); + } + final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = + new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals)); + rpcRegistry.tell(removeRpcMsg, ActorRef.noSender()); } - RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals)); - rpcRegistry.tell(removeRpcMsg, ActorRef.noSender()); - } - /** - * - * @param changes - * @return - */ - private Set> getRouteIdentifiers(Map> changes) { - RouteIdentifierImpl routeId = null; - Set> routeIdSet = new HashSet<>(); + /** + * + * @param changes + * @return + */ + private Set> getRouteIdentifiers( + final Map> changes) { - for (RpcRoutingContext context : changes.keySet()){ - for (YangInstanceIdentifier instanceId : changes.get(context)){ - routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId); - routeIdSet.add(routeId); - } + final Set> routeIdSet = new HashSet<>(); + for (final RpcRoutingContext context : changes.keySet()){ + for (final YangInstanceIdentifier instanceId : changes.get(context)){ + final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId); + routeIdSet.add(routeId); + } + } + return routeIdSet; } - return routeIdSet; - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 31aac92051..7ddac673c2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -8,45 +8,42 @@ package org.opendaylight.controller.remote.rpc; +import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; import akka.actor.Props; import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.japi.Pair; - +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; - +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; import org.opendaylight.controller.remote.rpc.utils.RoutingLogic; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Future; - -import static akka.pattern.Patterns.ask; - /** * Actor to initiate execution of remote RPC on other nodes of the cluster. */ @@ -54,68 +51,60 @@ import static akka.pattern.Patterns.ask; public class RpcBroker extends AbstractUntypedActor { private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class); - private final Broker.ProviderSession brokerSession; private final ActorRef rpcRegistry; - private SchemaContext schemaContext; private final RemoteRpcProviderConfig config; + private final DOMRpcService rpcService; - private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, - SchemaContext schemaContext) { - this.brokerSession = brokerSession; + private RpcBroker(final DOMRpcService rpcService, final ActorRef rpcRegistry) { + this.rpcService = rpcService; this.rpcRegistry = rpcRegistry; - this.schemaContext = schemaContext; config = new RemoteRpcProviderConfig(getContext().system().settings().config()); } - public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, - SchemaContext schemaContext) { - return Props.create(new RpcBrokerCreator(brokerSession, rpcRegistry, schemaContext)); + public static Props props(final DOMRpcService rpcService, final ActorRef rpcRegistry) { + Preconditions.checkNotNull(rpcRegistry, "ActorRef can not be null!"); + Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null"); + return Props.create(new RpcBrokerCreator(rpcService, rpcRegistry)); } @Override - protected void handleReceive(Object message) throws Exception { + protected void handleReceive(final Object message) throws Exception { if(message instanceof InvokeRpc) { invokeRemoteRpc((InvokeRpc) message); } else if(message instanceof ExecuteRpc) { executeRpc((ExecuteRpc) message); - } else if(message instanceof UpdateSchemaContext) { - updateSchemaContext((UpdateSchemaContext) message); } } - private void updateSchemaContext(UpdateSchemaContext message) { - this.schemaContext = message.getSchemaContext(); - } - private void invokeRemoteRpc(final InvokeRpc msg) { if(LOG.isDebugEnabled()) { LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); } - RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl( + final RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl( null, msg.getRpc(), msg.getIdentifier()); - RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId); + final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId); - scala.concurrent.Future future = ask(rpcRegistry, findMsg, config.getAskDuration()); + final scala.concurrent.Future future = ask(rpcRegistry, findMsg, config.getAskDuration()); final ActorRef sender = getSender(); final ActorRef self = self(); - OnComplete onComplete = new OnComplete() { + final OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Object reply) throws Throwable { + public void onComplete(final Throwable failure, final Object reply) throws Throwable { if(failure != null) { LOG.error("FindRouters failed", failure); sender.tell(new akka.actor.Status.Failure(failure), self); return; } - RpcRegistry.Messages.FindRoutersReply findReply = + final RpcRegistry.Messages.FindRoutersReply findReply = (RpcRegistry.Messages.FindRoutersReply)reply; - List> actorRefList = findReply.getRouterWithUpdateTime(); + final List> actorRefList = findReply.getRouterWithUpdateTime(); if(actorRefList == null || actorRefList.isEmpty()) { - String message = String.format( + final String message = String.format( "No remote implementation found for rpc %s", msg.getRpc()); sender.tell(new akka.actor.Status.Failure(new RpcErrorsException( message, Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, @@ -133,16 +122,16 @@ public class RpcBroker extends AbstractUntypedActor { protected void finishInvokeRpc(final List> actorRefList, final InvokeRpc msg, final ActorRef sender, final ActorRef self) { - RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList); + final RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList); - ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), - schemaContext), msg.getRpc()); + final Node serializedNode = NormalizedNodeSerializer.serialize(msg.getInput()); + final ExecuteRpc executeMsg = new ExecuteRpc(serializedNode, msg.getRpc()); - scala.concurrent.Future future = ask(logic.select(), executeMsg, config.getAskDuration()); + final scala.concurrent.Future future = ask(logic.select(), executeMsg, config.getAskDuration()); - OnComplete onComplete = new OnComplete() { + final OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Object reply) throws Throwable { + public void onComplete(final Throwable failure, final Object reply) throws Throwable { if(failure != null) { LOG.error("ExecuteRpc failed", failure); sender.tell(new akka.actor.Status.Failure(failure), self); @@ -160,24 +149,22 @@ public class RpcBroker extends AbstractUntypedActor { if(LOG.isDebugEnabled()) { LOG.debug("Executing rpc {}", msg.getRpc()); } - Future> future = brokerSession.rpc(msg.getRpc(), - XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), - schemaContext)); + final NormalizedNode input = NormalizedNodeSerializer.deSerialize(msg.getInputNormalizedNode()); + final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc()); + + final CheckedFuture future = rpcService.invokeRpc(schemaPath, input); - ListenableFuture> listenableFuture = + final ListenableFuture listenableFuture = JdkFutureAdapters.listenInPoolThread(future); final ActorRef sender = getSender(); final ActorRef self = self(); - Futures.addCallback(listenableFuture, new FutureCallback>() { + Futures.addCallback(listenableFuture, new FutureCallback() { @Override - public void onSuccess(RpcResult result) { - if(result.isSuccessful()) { - sender.tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result.getResult(), - schemaContext)), self); - } else { - String message = String.format("Execution of RPC %s failed", msg.getRpc()); + public void onSuccess(final DOMRpcResult result) { + if (result.getErrors() != null && ( ! result.getErrors().isEmpty())) { + final String message = String.format("Execution of RPC %s failed", msg.getRpc()); Collection errors = result.getErrors(); if(errors == null || errors.size() == 0) { errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, @@ -186,11 +173,14 @@ public class RpcBroker extends AbstractUntypedActor { sender.tell(new akka.actor.Status.Failure(new RpcErrorsException( message, errors)), self); + } else { + final Node serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult()); + sender.tell(new RpcResponse(serializedResultNode), self); } } @Override - public void onFailure(Throwable t) { + public void onFailure(final Throwable t) { LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t); sender.tell(new akka.actor.Status.Failure(t), self); } @@ -200,20 +190,17 @@ public class RpcBroker extends AbstractUntypedActor { private static class RpcBrokerCreator implements Creator { private static final long serialVersionUID = 1L; - final Broker.ProviderSession brokerSession; + final DOMRpcService rpcService; final ActorRef rpcRegistry; - final SchemaContext schemaContext; - RpcBrokerCreator(ProviderSession brokerSession, ActorRef rpcRegistry, - SchemaContext schemaContext) { - this.brokerSession = brokerSession; + RpcBrokerCreator(final DOMRpcService rpcService, final ActorRef rpcRegistry) { + this.rpcService = rpcService; this.rpcRegistry = rpcRegistry; - this.schemaContext = schemaContext; } @Override public RpcBroker create() throws Exception { - return new RpcBroker(brokerSession, rpcRegistry, schemaContext); + return new RpcBroker(rpcService, rpcRegistry); } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java index 22879dda2f..28ff1523cb 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java @@ -10,46 +10,55 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; -import org.opendaylight.yangtools.yang.common.QName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; - -public class RpcListener implements RpcRegistrationListener{ +public class RpcListener implements DOMRpcAvailabilityListener{ private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class); private final ActorRef rpcRegistry; - public RpcListener(ActorRef rpcRegistry) { + public RpcListener(final ActorRef rpcRegistry) { this.rpcRegistry = rpcRegistry; } - @Override - public void onRpcImplementationAdded(QName rpc) { - if(LOG.isDebugEnabled()) { - LOG.debug("Adding registration for [{}]", rpc); + @Override + public void onRpcAvailable(@Nonnull final Collection rpcs) { + Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null."); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding registration for [{}]", rpcs); + } + final List> routeIds = new ArrayList<>(); + + for (final DOMRpcIdentifier rpc : rpcs) { + final RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null); + routeIds.add(routeId); + } + final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds); + rpcRegistry.tell(addRpcMsg, ActorRef.noSender()); } - RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc, null); - List> routeIds = new ArrayList<>(); - routeIds.add(routeId); - RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds); - rpcRegistry.tell(addRpcMsg, ActorRef.noSender()); - } - @Override - public void onRpcImplementationRemoved(QName rpc) { - if(LOG.isDebugEnabled()) { - LOG.debug("Removing registration for [{}]", rpc); + @Override + public void onRpcUnavailable(@Nonnull final Collection rpcs) { + Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null."); + if(LOG.isDebugEnabled()) { + LOG.debug("Removing registration for [{}]", rpcs); + } + final List> routeIds = new ArrayList<>(); + for (final DOMRpcIdentifier rpc : rpcs) { + final RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null); + routeIds.add(routeId); + } + final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds); + rpcRegistry.tell(removeRpcMsg, ActorRef.noSender()); } - RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc, null); - List> routeIds = new ArrayList<>(); - routeIds.add(routeId); - RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds); - rpcRegistry.tell(removeRpcMsg, ActorRef.noSender()); - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index 4cbce63f9a..f12fda0aa1 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -14,13 +14,17 @@ import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.japi.Function; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; -import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.model.api.RpcDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,98 +38,104 @@ import scala.concurrent.duration.Duration; public class RpcManager extends AbstractUntypedActor { - private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class); - - private SchemaContext schemaContext; - private ActorRef rpcBroker; - private ActorRef rpcRegistry; - private final Broker.ProviderSession brokerSession; - private final RemoteRpcProviderConfig config; - private RpcListener rpcListener; - private RoutedRpcListener routeChangeListener; - private RemoteRpcImplementation rpcImplementation; - private final RpcProvisionRegistry rpcProvisionRegistry; - - private RpcManager(SchemaContext schemaContext, - Broker.ProviderSession brokerSession, - RpcProvisionRegistry rpcProvisionRegistry) { - this.schemaContext = schemaContext; - this.brokerSession = brokerSession; - this.rpcProvisionRegistry = rpcProvisionRegistry; - this.config = new RemoteRpcProviderConfig(getContext().system().settings().config()); - - createRpcActors(); - startListeners(); - } - - - public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession, - final RpcProvisionRegistry rpcProvisionRegistry) { - return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry); + private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class); + + private SchemaContext schemaContext; + private ActorRef rpcBroker; + private ActorRef rpcRegistry; + private final RemoteRpcProviderConfig config; + private RpcListener rpcListener; + private RoutedRpcListener routeChangeListener; + private RemoteRpcImplementation rpcImplementation; + private final DOMRpcProviderService rpcProvisionRegistry; + private final DOMRpcService rpcServices; + + private RpcManager(final SchemaContext schemaContext, + final DOMRpcProviderService rpcProvisionRegistry, + final DOMRpcService rpcSevices) { + this.schemaContext = schemaContext; + this.rpcProvisionRegistry = rpcProvisionRegistry; + rpcServices = rpcSevices; + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + + createRpcActors(); + startListeners(); } - private void createRpcActors() { - LOG.debug("Create rpc registry and broker actors"); - - rpcRegistry = - getContext().actorOf(Props.create(RpcRegistry.class). - withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); - - rpcBroker = - getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext). - withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); - - RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker); - rpcRegistry.tell(localRouter, self()); - } - - private void startListeners() { - LOG.debug("Registers rpc listeners"); - - rpcListener = new RpcListener(rpcRegistry); - routeChangeListener = new RoutedRpcListener(rpcRegistry); - rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config); - - brokerSession.addRpcRegistrationListener(rpcListener); - rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); - rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation); - announceSupportedRpcs(); - } - - /** - * Add all the locally registered RPCs in the clustered routing table - */ - private void announceSupportedRpcs(){ - LOG.debug("Adding all supported rpcs to routing table"); - Set currentlySupported = brokerSession.getSupportedRpcs(); - for (QName rpc : currentlySupported) { - rpcListener.onRpcImplementationAdded(rpc); + + public static Props props(final SchemaContext schemaContext, + final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) { + Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!"); + Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!"); + Preconditions.checkNotNull(rpcServices, "RpcService can not be null!"); + return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices); + } + + private void createRpcActors() { + LOG.debug("Create rpc registry and broker actors"); + + rpcRegistry = + getContext().actorOf(Props.create(RpcRegistry.class). + withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); + + rpcBroker = + getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry). + withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); + + final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker); + rpcRegistry.tell(localRouter, self()); } - } + private void startListeners() { + LOG.debug("Registers rpc listeners"); + + rpcListener = new RpcListener(rpcRegistry); + routeChangeListener = new RoutedRpcListener(rpcRegistry); + rpcImplementation = new RemoteRpcImplementation(rpcBroker, config); + + rpcServices.registerRpcListener(rpcListener); + +// rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); +// rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation); + announceSupportedRpcs(); + } - @Override - protected void handleReceive(Object message) throws Exception { - if(message instanceof UpdateSchemaContext) { - updateSchemaContext((UpdateSchemaContext) message); + /** + * Add all the locally registered RPCs in the clustered routing table + */ + private void announceSupportedRpcs(){ + LOG.debug("Adding all supported rpcs to routing table"); + final Set currentlySupportedRpc = schemaContext.getOperations(); + final List rpcs = new ArrayList<>(); + for (final RpcDefinition rpcDef : currentlySupportedRpc) { + rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath())); + } + rpcListener.onRpcAvailable(rpcs); } - } - private void updateSchemaContext(UpdateSchemaContext message) { - this.schemaContext = message.getSchemaContext(); - rpcBroker.tell(message, ActorRef.noSender()); - } + @Override + protected void handleReceive(final Object message) throws Exception { + if(message instanceof UpdateSchemaContext) { + updateSchemaContext((UpdateSchemaContext) message); + } + + } + + private void updateSchemaContext(final UpdateSchemaContext message) { + schemaContext = message.getSchemaContext(); + rpcBroker.tell(message, ActorRef.noSender()); + } - @Override - public SupervisorStrategy supervisorStrategy() { - return new OneForOneStrategy(10, Duration.create("1 minute"), - new Function() { - @Override - public SupervisorStrategy.Directive apply(Throwable t) { - return SupervisorStrategy.resume(); + @Override + public SupervisorStrategy supervisorStrategy() { + return new OneForOneStrategy(10, Duration.create("1 minute"), + new Function() { + @Override + public SupervisorStrategy.Directive apply(final Throwable t) { + return SupervisorStrategy.resume(); + } } - } - ); - } + ); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java index 5d780be641..66c0c1b6f0 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java @@ -10,24 +10,25 @@ package org.opendaylight.controller.remote.rpc.messages; import com.google.common.base.Preconditions; import java.io.Serializable; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.yangtools.yang.common.QName; public class ExecuteRpc implements Serializable { private static final long serialVersionUID = 1128904894827335676L; - private final String inputCompositeNode; + private final NormalizedNodeMessages.Node inputNormalizedNode; private final QName rpc; - public ExecuteRpc(final String inputCompositeNode, final QName rpc) { - Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present"); + public ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) { + Preconditions.checkNotNull(inputNormalizedNode, "Normalized Node input string should be present"); Preconditions.checkNotNull(rpc, "rpc Qname should not be null"); - this.inputCompositeNode = inputCompositeNode; + this.inputNormalizedNode = inputNormalizedNode; this.rpc = rpc; } - public String getInputCompositeNode() { - return inputCompositeNode; + public NormalizedNodeMessages.Node getInputNormalizedNode() { + return inputNormalizedNode; } public QName getRpc() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java index 9c40dbfc58..a7fbe8305e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java @@ -10,17 +10,17 @@ package org.opendaylight.controller.remote.rpc.messages; import com.google.common.base.Preconditions; import java.io.Serializable; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class InvokeRpc implements Serializable { private static final long serialVersionUID = -2813459607858108953L; private final QName rpc; private final YangInstanceIdentifier identifier; - private final CompositeNode input; + private final NormalizedNode input; - public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) { + public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final NormalizedNode input) { Preconditions.checkNotNull(rpc, "rpc qname should not be null"); Preconditions.checkNotNull(input, "rpc input should not be null"); @@ -37,7 +37,7 @@ public class InvokeRpc implements Serializable { return identifier; } - public CompositeNode getInput() { + public NormalizedNode getInput() { return input; } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java index e6b208cb6f..e5cb488d97 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java @@ -8,17 +8,18 @@ package org.opendaylight.controller.remote.rpc.messages; import java.io.Serializable; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; public class RpcResponse implements Serializable { private static final long serialVersionUID = -4211279498688989245L; - private final String resultCompositeNode; + private final NormalizedNodeMessages.Node resultNormalizedNode; - public RpcResponse(final String resultCompositeNode) { - this.resultCompositeNode = resultCompositeNode; + public RpcResponse(final NormalizedNodeMessages.Node inputNormalizedNode) { + resultNormalizedNode = inputNormalizedNode; } - public String getResultCompositeNode() { - return resultCompositeNode; + public NormalizedNodeMessages.Node getResultNormalizedNode() { + return resultNormalizedNode; } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java index 46406fd4fe..afe81a8800 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java @@ -8,38 +8,28 @@ package org.opendaylight.controller.remote.rpc; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; -import com.google.common.collect.ImmutableList; +import java.io.File; +import java.net.URI; +import java.util.Arrays; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.mockito.Mockito; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.Node; -import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; -import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; -import java.io.File; -import java.net.URI; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - /** * Base class for RPC tests. * @@ -64,11 +54,12 @@ public class AbstractRpcTest { protected JavaTestKit probeReg2; protected Broker.ProviderSession brokerSession; protected SchemaContext schemaContext; + protected DOMRpcService rpcService; @BeforeClass public static void setup() throws InterruptedException { - RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); - RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build(); + final RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); + final RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build(); node1 = ActorSystem.create("opendaylight-rpc", config1.get()); node2 = ActorSystem.create("opendaylight-rpc", config2.get()); } @@ -87,16 +78,18 @@ public class AbstractRpcTest { new File(RpcBrokerTest.class.getResource("/test-rpc.yang").getPath()))); brokerSession = Mockito.mock(Broker.ProviderSession.class); + rpcService = Mockito.mock(DOMRpcService.class); + probeReg1 = new JavaTestKit(node1); - rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext)); + rpcBroker1 = node1.actorOf(RpcBroker.props(rpcService, probeReg1.getRef())); probeReg2 = new JavaTestKit(node2); - rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext)); + rpcBroker2 = node2.actorOf(RpcBroker.props(rpcService, probeReg2.getRef())); } - static void assertRpcErrorEquals(RpcError rpcError, ErrorSeverity severity, - ErrorType errorType, String tag, String message, String applicationTag, String info, - String causeMsg) { + static void assertRpcErrorEquals(final RpcError rpcError, final ErrorSeverity severity, + final ErrorType errorType, final String tag, final String message, final String applicationTag, final String info, + final String causeMsg) { assertEquals("getSeverity", severity, rpcError.getSeverity()); assertEquals("getErrorType", errorType, rpcError.getErrorType()); assertEquals("getTag", tag, rpcError.getTag()); @@ -111,57 +104,6 @@ public class AbstractRpcTest { } } - static void assertCompositeNodeEquals(CompositeNode exp, CompositeNode actual) { - assertEquals("NodeType getNamespace", exp.getNodeType().getNamespace(), - actual.getNodeType().getNamespace()); - assertEquals("NodeType getLocalName", exp.getNodeType().getLocalName(), - actual.getNodeType().getLocalName()); - for(Node child: exp.getValue()) { - List> c = actual.get(child.getNodeType()); - assertNotNull("Missing expected child " + child.getNodeType(), c); - if(child instanceof CompositeNode) { - assertCompositeNodeEquals((CompositeNode) child, (CompositeNode)c.get(0)); - } else { - assertEquals("Value for Node " + child.getNodeType(), child.getValue(), - c.get(0).getValue()); - } - } - } - - static CompositeNode makeRPCInput(String data) { - CompositeNodeBuilder builder = ImmutableCompositeNode.builder() - .setQName(TEST_RPC_INPUT).addLeaf(TEST_RPC_INPUT_DATA, data); - return ImmutableCompositeNode.create( - TEST_RPC, ImmutableList.>of(builder.toInstance())); - } - - static CompositeNode makeRPCOutput(String data) { - CompositeNodeBuilder builder = ImmutableCompositeNode.builder() - .setQName(TEST_RPC_OUTPUT).addLeaf(TEST_RPC_OUTPUT_DATA, data); - return ImmutableCompositeNode.create( - TEST_RPC, ImmutableList.>of(builder.toInstance())); - } - - static void assertFailedRpcResult(RpcResult rpcResult, ErrorSeverity severity, - ErrorType errorType, String tag, String message, String applicationTag, String info, - String causeMsg) { - - assertNotNull("RpcResult was null", rpcResult); - assertEquals("isSuccessful", false, rpcResult.isSuccessful()); - Collection rpcErrors = rpcResult.getErrors(); - assertEquals("RpcErrors count", 1, rpcErrors.size()); - assertRpcErrorEquals(rpcErrors.iterator().next(), severity, errorType, tag, message, - applicationTag, info, causeMsg); - } - - static void assertSuccessfulRpcResult(RpcResult rpcResult, - CompositeNode expOutput) { - - assertNotNull("RpcResult was null", rpcResult); - assertEquals("isSuccessful", true, rpcResult.isSuccessful()); - assertCompositeNodeEquals(expOutput, rpcResult.getResult()); - } - static class TestException extends Exception { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java index 49451dd0db..2026d48a81 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java @@ -8,26 +8,6 @@ package org.opendaylight.controller.remote.rpc; -import akka.testkit.JavaTestKit; -import com.google.common.util.concurrent.ListenableFuture; -import org.junit.Test; -import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; -import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.xml.codec.XmlUtils; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; - -import java.net.URI; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; /*** * Unit tests for RemoteRpcImplementation. @@ -36,151 +16,6 @@ import static org.junit.Assert.assertEquals; */ public class RemoteRpcImplementationTest extends AbstractRpcTest { - @Test - public void testInvokeRpc() throws Exception { - final AtomicReference assertError = new AtomicReference<>(); - try { - RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( - probeReg1.getRef(), schemaContext, getConfig()); - - final CompositeNode input = makeRPCInput("foo"); - final CompositeNode output = makeRPCOutput("bar"); - final AtomicReference invokeRpcMsg = setupInvokeRpcReply(assertError, output); - - ListenableFuture> future = rpcImpl.invokeRpc(TEST_RPC, input); - - RpcResult rpcResult = future.get(5, TimeUnit.SECONDS); - - assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0)); - - assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc()); - assertEquals("getInput", input, invokeRpcMsg.get().getInput()); - } finally { - if(assertError.get() != null) { - throw assertError.get(); - } - } - } - - @Test - public void testInvokeRpcWithIdentifier() throws Exception { - final AtomicReference assertError = new AtomicReference<>(); - try { - RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( - probeReg1.getRef(), schemaContext, getConfig()); - - QName instanceQName = new QName(new URI("ns"), "instance"); - YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName); - - CompositeNode input = makeRPCInput("foo"); - CompositeNode output = makeRPCOutput("bar"); - final AtomicReference invokeRpcMsg = setupInvokeRpcReply(assertError, output); - - ListenableFuture> future = rpcImpl.invokeRpc( - TEST_RPC, identifier, input); - - RpcResult rpcResult = future.get(5, TimeUnit.SECONDS); - - assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0)); - - assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc()); - assertEquals("getInput", input, invokeRpcMsg.get().getInput()); - assertEquals("getRoute", identifier, invokeRpcMsg.get().getIdentifier()); - } finally { - if(assertError.get() != null) { - throw assertError.get(); - } - } - } - - @Test - public void testInvokeRpcWithRpcErrorsException() throws Exception { - final AtomicReference assertError = new AtomicReference<>(); - try { - RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( - probeReg1.getRef(), schemaContext, getConfig()); - - final CompositeNode input = makeRPCInput("foo"); - - setupInvokeRpcErrorReply(assertError, new RpcErrorsException( - "mock", Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, "tag", - "error", "appTag", "info", null)))); - - ListenableFuture> future = rpcImpl.invokeRpc(TEST_RPC, input); - - RpcResult rpcResult = future.get(5, TimeUnit.SECONDS); - - assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "tag", - "error", "appTag", "info", null); - } finally { - if(assertError.get() != null) { - throw assertError.get(); - } - } - } - - @Test - public void testInvokeRpcWithOtherException() throws Exception { - final AtomicReference assertError = new AtomicReference<>(); - try { - RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( - probeReg1.getRef(), schemaContext, getConfig()); - - final CompositeNode input = makeRPCInput("foo"); - - setupInvokeRpcErrorReply(assertError, new TestException()); - - ListenableFuture> future = rpcImpl.invokeRpc(TEST_RPC, input); - - RpcResult rpcResult = future.get(5, TimeUnit.SECONDS); - - assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "operation-failed", - TestException.MESSAGE, null, null, TestException.MESSAGE); - } finally { - if(assertError.get() != null) { - throw assertError.get(); - } - } - } - - private AtomicReference setupInvokeRpcReply( - final AtomicReference assertError, final CompositeNode output) { - return setupInvokeRpcReply(assertError, output, null); - } - - private AtomicReference setupInvokeRpcErrorReply( - final AtomicReference assertError, final Exception error) { - return setupInvokeRpcReply(assertError, null, error); - } - - private AtomicReference setupInvokeRpcReply( - final AtomicReference assertError, final CompositeNode output, - final Exception error) { - final AtomicReference invokeRpcMsg = new AtomicReference<>(); - - new Thread() { - @Override - public void run() { - try { - invokeRpcMsg.set(probeReg1.expectMsgClass( - JavaTestKit.duration("5 seconds"), InvokeRpc.class)); - - if(output != null) { - probeReg1.reply(new RpcResponse(XmlUtils.outputCompositeNodeToXml( - output, schemaContext))); - } else { - probeReg1.reply(new akka.actor.Status.Failure(error)); - } - - } catch(AssertionError e) { - assertError.set(e); - } - } - - }.start(); - - return invokeRpcMsg; - } private RemoteRpcProviderConfig getConfig(){ return new RemoteRpcProviderConfig.Builder("unit-test").build(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java index 8b4599ca8c..5cd3df3a24 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java @@ -10,25 +10,11 @@ package org.opendaylight.controller.remote.rpc; -import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import com.typesafe.config.Config; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Test; -import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; -import org.opendaylight.controller.sal.core.api.model.SchemaService; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; - -import java.util.concurrent.TimeUnit; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class RemoteRpcProviderTest { @@ -38,7 +24,7 @@ public class RemoteRpcProviderTest { @BeforeClass public static void setup() throws InterruptedException { moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build(); - Config config = moduleConfig.get(); + final Config config = moduleConfig.get(); system = ActorSystem.create("odl-cluster-rpc", config); } @@ -49,21 +35,4 @@ public class RemoteRpcProviderTest { system = null; } - @Test - public void testRemoteRpcProvider() throws Exception { - RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(RpcProvisionRegistry.class)); - Broker.ProviderSession session = mock(Broker.ProviderSession.class); - SchemaService schemaService = mock(SchemaService.class); - when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class)); - when(session.getService(SchemaService.class)).thenReturn(schemaService); - - rpcProvider.onSessionInitiated(session); - - ActorRef actorRef = Await.result( - system.actorSelection( - moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)), - Duration.create(2, TimeUnit.SECONDS)); - - Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath())); - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java index 28b1b476cd..16b13910a8 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java @@ -9,269 +9,8 @@ package org.opendaylight.controller.remote.rpc; -import akka.actor.ActorRef; -import akka.japi.Pair; -import akka.testkit.JavaTestKit; - -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import static org.junit.Assert.assertEquals; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; -import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; -import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; -import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; -import org.opendaylight.controller.xml.codec.XmlUtils; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.any; public class RpcBrokerTest extends AbstractRpcTest { - @Test - public void testInvokeRpcWithNoRemoteActor() throws Exception { - new JavaTestKit(node1) {{ - CompositeNode input = makeRPCInput("foo"); - - InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, input); - rpcBroker1.tell(invokeMsg, getRef()); - - probeReg1.expectMsgClass(duration("5 seconds"), RpcRegistry.Messages.FindRouters.class); - probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply( - Collections.>emptyList())); - - akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), - akka.actor.Status.Failure.class); - - assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass()); - }}; - } - - - /** - * This test method invokes and executes the remote rpc - */ - //@Test - public void testInvokeRpc() throws URISyntaxException { - new JavaTestKit(node1) {{ - QName instanceQName = new QName(new URI("ns"), "instance"); - - CompositeNode invokeRpcResult = makeRPCOutput("bar"); - RpcResult rpcResult = - RpcResultBuilder.success(invokeRpcResult).build(); - ArgumentCaptor inputCaptor = new ArgumentCaptor<>(); - when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture())) - .thenReturn(Futures.immediateFuture(rpcResult)); - - // invoke rpc - CompositeNode input = makeRPCInput("foo"); - YangInstanceIdentifier instanceID = YangInstanceIdentifier.of(instanceQName); - InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, instanceID, input); - rpcBroker1.tell(invokeMsg, getRef()); - - FindRouters findRouters = probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); - RouteIdentifier routeIdentifier = findRouters.getRouteIdentifier(); - assertEquals("getType", TEST_RPC, routeIdentifier.getType()); - assertEquals("getRoute", instanceID, routeIdentifier.getRoute()); - - probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply( - Arrays.asList(new Pair(rpcBroker2, 200L)))); - - RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class); - assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0), - XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode())); - assertCompositeNodeEquals(input, inputCaptor.getValue()); - }}; - } - - @Test - public void testInvokeRpcWithNoOutput() { - new JavaTestKit(node1) {{ - - RpcResult rpcResult = RpcResultBuilder.success().build(); - when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) - .thenReturn(Futures.immediateFuture(rpcResult)); - - InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo")); - rpcBroker1.tell(invokeMsg, getRef()); - - probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); - probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply( - Arrays.asList(new Pair(rpcBroker2, 200L)))); - - RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class); - - assertEquals("getResultCompositeNode", "", rpcResponse.getResultCompositeNode()); - }}; - } - - @Test - public void testInvokeRpcWithExecuteFailure() { - new JavaTestKit(node1) {{ - - RpcResult rpcResult = RpcResultBuilder.failed() - .withError(ErrorType.RPC, "tag", "error", "appTag", "info", - new Exception("mock")) - .build(); - when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) - .thenReturn(Futures.immediateFuture(rpcResult)); - - InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo")); - rpcBroker1.tell(invokeMsg, getRef()); - - probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); - probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply( - Arrays.asList(new Pair(rpcBroker2, 200L)))); - - akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), - akka.actor.Status.Failure.class); - - assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass()); - - RpcErrorsException errorsEx = (RpcErrorsException)failure.cause(); - List rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors()); - assertEquals("RpcErrors count", 1, rpcErrors.size()); - assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag", - "error", "appTag", "info", "mock"); - }}; - } - - @Test - public void testInvokeRpcWithFindRoutersFailure() { - new JavaTestKit(node1) {{ - - InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo")); - rpcBroker1.tell(invokeMsg, getRef()); - - probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); - probeReg1.reply(new akka.actor.Status.Failure(new TestException())); - - akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), - akka.actor.Status.Failure.class); - - assertEquals("failure.cause()", TestException.class, failure.cause().getClass()); - }}; - } - - @Test - public void testExecuteRpc() { - new JavaTestKit(node1) {{ - - String xml = "foo"; - - CompositeNode invokeRpcResult = makeRPCOutput("bar"); - RpcResult rpcResult = - RpcResultBuilder.success(invokeRpcResult).build(); - ArgumentCaptor inputCaptor = new ArgumentCaptor<>(); - when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture())) - .thenReturn(Futures.immediateFuture(rpcResult)); - - ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC); - - rpcBroker1.tell(executeMsg, getRef()); - - RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class); - - assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0), - XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode())); - }}; - } - - @Test - public void testExecuteRpcFailureWithRpcErrors() { - new JavaTestKit(node1) {{ - - String xml = "foo"; - - RpcResult rpcResult = RpcResultBuilder.failed() - .withError(ErrorType.RPC, "tag1", "error", "appTag1", "info1", - new Exception("mock")) - .withWarning(ErrorType.PROTOCOL, "tag2", "warning", "appTag2", "info2", null) - .build(); - when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) - .thenReturn(Futures.immediateFuture(rpcResult)); - - ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC); - - rpcBroker1.tell(executeMsg, getRef()); - - akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), - akka.actor.Status.Failure.class); - - assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass()); - - RpcErrorsException errorsEx = (RpcErrorsException)failure.cause(); - List rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors()); - assertEquals("RpcErrors count", 2, rpcErrors.size()); - assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag1", - "error", "appTag1", "info1", "mock"); - assertRpcErrorEquals(rpcErrors.get(1), ErrorSeverity.WARNING, ErrorType.PROTOCOL, "tag2", - "warning", "appTag2", "info2", null); - }}; - } - - @Test - public void testExecuteRpcFailureWithNoRpcErrors() { - new JavaTestKit(node1) {{ - - String xml = "foo"; - - RpcResult rpcResult = RpcResultBuilder.failed().build(); - when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) - .thenReturn(Futures.immediateFuture(rpcResult)); - - ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC); - - rpcBroker1.tell(executeMsg, getRef()); - - akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), - akka.actor.Status.Failure.class); - - assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass()); - - RpcErrorsException errorsEx = (RpcErrorsException)failure.cause(); - List rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors()); - assertEquals("RpcErrors count", 1, rpcErrors.size()); - assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, - "operation-failed", "failed", null, null, null); - }}; - } - - @Test - public void testExecuteRpcFailureWithException() { - new JavaTestKit(node1) {{ - - String xml = "foo"; - - when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) - .thenReturn(Futures.>immediateFailedFuture( - new TestException())); - - ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC); - - rpcBroker1.tell(executeMsg, getRef()); - - akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), - akka.actor.Status.Failure.class); - assertEquals("failure.cause()", TestException.class, failure.cause().getClass()); - }}; - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java index 956e159990..ecfaef8419 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java @@ -8,18 +8,11 @@ package org.opendaylight.controller.remote.rpc; -import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import com.typesafe.config.ConfigFactory; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.yangtools.yang.common.QName; - -import java.net.URI; -import java.net.URISyntaxException; public class RpcListenerTest { @@ -37,37 +30,4 @@ public class RpcListenerTest { system = null; } - @Test - public void testRpcAdd() throws URISyntaxException { - new JavaTestKit(system) { - { - JavaTestKit probeReg = new JavaTestKit(system); - ActorRef rpcRegistry = probeReg.getRef(); - - RpcListener rpcListener = new RpcListener(rpcRegistry); - - QName qName = new QName(new URI("actor2"), "actor2"); - - rpcListener.onRpcImplementationAdded(qName); - probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class); - }}; - - } - - @Test - public void testRpcRemove() throws URISyntaxException { - new JavaTestKit(system) { - { - JavaTestKit probeReg = new JavaTestKit(system); - ActorRef rpcRegistry = probeReg.getRef(); - - RpcListener rpcListener = new RpcListener(rpcRegistry); - - QName qName = new QName(new URI("actor2"), "actor2"); - - rpcListener.onRpcImplementationRemoved(qName); - probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class); - }}; - - } } -- 2.36.6