BUG 2412 - remove CompositeNode from sal-remoterpc-connector 54/16754/3
authorJan Hajnar <jhajnar@cisco.com>
Wed, 18 Mar 2015 16:37:56 +0000 (17:37 +0100)
committerTony Tkacik <ttkacik@cisco.com>
Thu, 19 Mar 2015 16:20:19 +0000 (17:20 +0100)
* akka remote RPC distribution needs move to new DOM API
* remove all tests which are use CompositeNode. Test has to be
rewriten with NormalizedNodes ASAP.

Change-Id: I8863f5c9657992d23be67acb2f542b0fda81db8c
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
Signed-off-by: Jan Hajnar <jhajnar@cisco.com>
16 files changed:
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java

index 0f84abb22e8e6ca92d36e5486dac0f08ba699720..360ac68a519f29695410ef50115ddc98c90f2343 100644 (file)
@@ -1,95 +1,87 @@
 package org.opendaylight.controller.remote.rpc;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.xml.codec.XmlUtils;
-import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
 
-import java.util.Collections;
-import java.util.Set;
-
-import static akka.pattern.Patterns.ask;
-
-public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
+public class RemoteRpcImplementation implements DOMRpcImplementation {
     private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
     private final ActorRef rpcBroker;
-    private final SchemaContext schemaContext;
     private final RemoteRpcProviderConfig config;
 
-    public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext, RemoteRpcProviderConfig config) {
+    public RemoteRpcImplementation(final ActorRef rpcBroker, final RemoteRpcProviderConfig config) {
         this.rpcBroker = rpcBroker;
-        this.schemaContext = schemaContext;
         this.config = config;
     }
 
     @Override
-    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc,
-            YangInstanceIdentifier identifier, CompositeNode input) {
-        InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input);
-
-        return executeMsg(rpcMsg);
-    }
-
-    @Override
-    public Set<QName> getSupportedRpcs() {
-        // TODO : check if we need to get this from routing registry
-        return Collections.emptySet();
-    }
+    public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final DOMRpcIdentifier rpc, final NormalizedNode<?, ?> input) {
+        final InvokeRpc rpcMsg = new InvokeRpc(rpc.getType().getLastComponent(), rpc.getContextReference(), input);
 
-    @Override
-    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
-        InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input);
-        return executeMsg(rpcMsg);
-    }
+        final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
 
-    private ListenableFuture<RpcResult<CompositeNode>> executeMsg(InvokeRpc rpcMsg) {
+        final ListenableFuture<DOMRpcResult> listenableFuture =
+                JdkFutureAdapters.listenInPoolThread(settableFuture);
 
-        final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
+        final scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
 
-        scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
-
-        OnComplete<Object> onComplete = new OnComplete<Object>() {
+        final OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object reply) throws Throwable {
+            public void onComplete(final Throwable failure, final Object reply) throws Throwable {
                 if(failure != null) {
                     LOG.error("InvokeRpc failed", failure);
 
-                    RpcResult<CompositeNode> rpcResult;
-                    if(failure instanceof RpcErrorsException) {
-                        rpcResult = RpcResultBuilder.<CompositeNode>failed().withRpcErrors(
-                                ((RpcErrorsException)failure).getRpcErrors()).build();
-                    } else {
-                        rpcResult = RpcResultBuilder.<CompositeNode>failed().withError(
-                                ErrorType.RPC, failure.getMessage(), failure).build();
+                    final String message = String.format("Execution of RPC %s failed",  rpcMsg.getRpc());
+                    Collection<RpcError> errors = ((RpcErrorsException)failure).getRpcErrors();
+                    if(errors == null || errors.size() == 0) {
+                        errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, null, message));
                     }
+                    final DOMRpcResult rpcResult = new DefaultDOMRpcResult(errors);
 
-                    listenableFuture.set(rpcResult);
+                    settableFuture.set(rpcResult);
                     return;
                 }
 
-                RpcResponse rpcReply = (RpcResponse)reply;
-                CompositeNode result = XmlUtils.xmlToCompositeNode(rpcReply.getResultCompositeNode());
-                listenableFuture.set(RpcResultBuilder.success(result).build());
+                final RpcResponse rpcReply = (RpcResponse)reply;
+                final NormalizedNode<?, ?> result =
+                        NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
+                settableFuture.set(new DefaultDOMRpcResult(result));
             }
         };
 
-        future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global());
 
-        return listenableFuture;
+        future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global());
+        // FIXME find non blocking way for implementation
+        try {
+            return Futures.immediateCheckedFuture(listenableFuture.get());
+        }
+        catch (InterruptedException | ExecutionException e) {
+            LOG.debug("Unexpected remote RPC exception.", e);
+            return Futures.immediateFailedCheckedFuture((DOMRpcException) new DOMRpcImplementationNotAvailableException(e, "Unexpected remote RPC exception"));
+        }
     }
 }
index d24ed5651aa8a5ffa0d554769fb389c5dc05ab78..a1b6286a59c764aff95d57650e4356943ea44544 100644 (file)
@@ -11,18 +11,19 @@ package org.opendaylight.controller.remote.rpc;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-
 /**
  * This is the base class which initialize all the actors, listeners and
  * default RPc implementation so remote invocation of rpcs.
@@ -31,30 +32,37 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
 
   private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
 
-  private final RpcProvisionRegistry rpcProvisionRegistry;
+  private final DOMRpcProviderService rpcProvisionRegistry;
 
+  private ListenerRegistration<SchemaContextListener> schemaListenerRegistration;
   private ActorSystem actorSystem;
   private Broker.ProviderSession brokerSession;
   private SchemaContext schemaContext;
   private ActorRef rpcManager;
-  private RemoteRpcProviderConfig config;
+  private final RemoteRpcProviderConfig config;
 
 
-  public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
+  public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry) {
     this.actorSystem = actorSystem;
     this.rpcProvisionRegistry = rpcProvisionRegistry;
-    this.config = new RemoteRpcProviderConfig(actorSystem.settings().config());
+    config = new RemoteRpcProviderConfig(actorSystem.settings().config());
   }
 
   @Override
   public void close() throws Exception {
-    if (this.actorSystem != null)
-      this.actorSystem.shutdown();
+    if (actorSystem != null) {
+        actorSystem.shutdown();
+        actorSystem = null;
+    }
+    if (schemaListenerRegistration != null) {
+        schemaListenerRegistration.close();
+        schemaListenerRegistration = null;
+    }
   }
 
   @Override
-  public void onSessionInitiated(Broker.ProviderSession session) {
-    this.brokerSession = session;
+  public void onSessionInitiated(final Broker.ProviderSession session) {
+    brokerSession = session;
     start();
   }
 
@@ -66,21 +74,18 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
   private void start() {
     LOG.info("Starting remote rpc service...");
 
-    SchemaService schemaService = brokerSession.getService(SchemaService.class);
+    final SchemaService schemaService = brokerSession.getService(SchemaService.class);
+    final DOMRpcService rpcService = brokerSession.getService(DOMRpcService.class);
     schemaContext = schemaService.getGlobalContext();
-
-    rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry),
-                                     config.getRpcManagerName());
-
+    rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext,
+            rpcProvisionRegistry, rpcService), config.getRpcManagerName());
+    schemaListenerRegistration = schemaService.registerSchemaContextListener(this);
     LOG.debug("rpc manager started");
-
-    schemaService.registerSchemaContextListener(this);
   }
 
   @Override
-  public void onGlobalContextUpdated(SchemaContext schemaContext) {
+  public void onGlobalContextUpdated(final SchemaContext schemaContext) {
     this.schemaContext = schemaContext;
     rpcManager.tell(new UpdateSchemaContext(schemaContext), null);
-
   }
 }
index c82a72eaa56c82e40388b8fb612eed7b198dbae8..d17242ed6048f51638d11612db130c2d62f33961 100644 (file)
@@ -11,8 +11,8 @@ package org.opendaylight.controller.remote.rpc;
 import akka.actor.ActorSystem;
 import akka.osgi.BundleDelegatingClassLoader;
 import com.typesafe.config.Config;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,21 +23,21 @@ public class RemoteRpcProviderFactory {
     public static RemoteRpcProvider createInstance(
             final Broker broker, final BundleContext bundleContext, final RemoteRpcProviderConfig config){
 
-      RemoteRpcProvider rpcProvider =
-          new RemoteRpcProvider(createActorSystem(bundleContext, config), (RpcProvisionRegistry) broker);
+      final RemoteRpcProvider rpcProvider =
+          new RemoteRpcProvider(createActorSystem(bundleContext, config), (DOMRpcProviderService) broker);
 
       broker.registerProvider(rpcProvider);
       return rpcProvider;
     }
 
-    private static ActorSystem createActorSystem(BundleContext bundleContext, RemoteRpcProviderConfig config){
+    private static ActorSystem createActorSystem(final BundleContext bundleContext, final RemoteRpcProviderConfig config){
 
         // Create an OSGi bundle classloader for actor system
-        BundleDelegatingClassLoader classLoader =
+        final BundleDelegatingClassLoader classLoader =
                 new BundleDelegatingClassLoader(bundleContext.getBundle(),
                         Thread.currentThread().getContextClassLoader());
 
-        Config actorSystemConfig = config.get();
+        final Config actorSystemConfig = config.get();
         if(LOG.isDebugEnabled()) {
             LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
         }
index 85d21381b61bf127fb95e0e11a67275d6d25ec86..99e812193c727f403404852976177483e064d902 100644 (file)
@@ -8,13 +8,11 @@
 package org.opendaylight.controller.remote.rpc;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-
-import java.io.Serializable;
-
 public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, YangInstanceIdentifier>,Serializable {
   private static final long serialVersionUID = 1L;
 
@@ -31,26 +29,26 @@ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QNa
 
   @Override
   public QName getContext() {
-    return this.context;
+    return context;
   }
 
   @Override
   public QName getType() {
-    return this.type;
+    return type;
   }
 
   @Override
   public YangInstanceIdentifier getRoute() {
-    return this.route;
+    return route;
   }
 
 
   @Override
-  public boolean equals(Object o) {
+  public boolean equals(final Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
 
-    RouteIdentifierImpl that = (RouteIdentifierImpl) o;
+    final RouteIdentifierImpl that = (RouteIdentifierImpl) o;
 
     if (context == null){
       if (that.getContext() != null)  return false;
@@ -72,7 +70,7 @@ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QNa
 
   @Override
   public int hashCode() {
-    int prime = 31;
+    final int prime = 31;
     int result = 0;
     result = prime * result + (context == null ? 0:context.hashCode());
     result = prime * result + (type    == null ? 0:type.hashCode());
index 2aaac5a78ed531fc830bfca7540d2603a2e0f41b..c354320b8bea6bc450daeb976bdbe82d1e6f1ed9 100644 (file)
@@ -11,6 +11,10 @@ package org.opendaylight.controller.remote.rpc;
 
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
@@ -20,73 +24,69 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>{
-  private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
-  private final ActorRef rpcRegistry;
+    private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
+    private final ActorRef rpcRegistry;
 
-  public RoutedRpcListener(ActorRef rpcRegistry) {
-    Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null");
-
-    this.rpcRegistry = rpcRegistry;
-  }
-
-  @Override
-  public void onRouteChange(RouteChange<RpcRoutingContext, YangInstanceIdentifier> routeChange) {
-    Map<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = routeChange.getAnnouncements();
-    if(announcements != null && announcements.size() > 0){
-      announce(getRouteIdentifiers(announcements));
+    public RoutedRpcListener(final ActorRef rpcRegistry) {
+        Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null");
+        this.rpcRegistry = rpcRegistry;
     }
 
-    Map<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = routeChange.getRemovals();
-    if(removals != null && removals.size() > 0 ) {
-      remove(getRouteIdentifiers(removals));
+    @Override
+    public void onRouteChange(final RouteChange<RpcRoutingContext, YangInstanceIdentifier> routeChange) {
+        final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = routeChange.getAnnouncements();
+        if(announcements != null && announcements.size() > 0){
+            announce(getRouteIdentifiers(announcements));
+        }
+
+        final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = routeChange.getRemovals();
+        if(removals != null && removals.size() > 0 ) {
+            remove(getRouteIdentifiers(removals));
+        }
     }
-  }
 
-  /**
-   *
-   * @param announcements
-   */
-  private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
-    if(LOG.isDebugEnabled()) {
-        LOG.debug("Announcing [{}]", announcements);
+    /**
+     *
+     * @param announcements
+     */
+    private void announce(final Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Announcing [{}]", announcements);
+        }
+        final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg =
+                new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
+        rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
     }
-    RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
-    rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
-  }
 
-  /**
-   *
-   * @param removals
-   */
-  private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
-    if(LOG.isDebugEnabled()) {
-        LOG.debug("Removing [{}]", removals);
+    /**
+     *
+     * @param removals
+     */
+    private void remove(final Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Removing [{}]", removals);
+        }
+        final RpcRegistry.Messages.RemoveRoutes removeRpcMsg =
+                new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
+        rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
     }
-    RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
-    rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
-  }
 
-  /**
-   *
-   * @param changes
-   * @return
-   */
-  private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(Map<RpcRoutingContext, Set<YangInstanceIdentifier>> changes) {
-    RouteIdentifierImpl routeId = null;
-    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
+    /**
+     *
+     * @param changes
+     * @return
+     */
+    private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(
+            final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> changes) {
 
-    for (RpcRoutingContext context : changes.keySet()){
-      for (YangInstanceIdentifier instanceId : changes.get(context)){
-        routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
-        routeIdSet.add(routeId);
-      }
+        final Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
+        for (final RpcRoutingContext context : changes.keySet()){
+            for (final YangInstanceIdentifier instanceId : changes.get(context)){
+                final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
+                routeIdSet.add(routeId);
+            }
+        }
+        return routeIdSet;
     }
-    return routeIdSet;
-  }
 }
index 31aac92051f1ee6948cbefcf4c68e49d926c5da0..7ddac673c2e84827b5cb18d015319b13e587ad97 100644 (file)
@@ -8,45 +8,42 @@
 
 package org.opendaylight.controller.remote.rpc;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.japi.Pair;
-
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
 import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
 import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Future;
-
-import static akka.pattern.Patterns.ask;
-
 /**
  * Actor to initiate execution of remote RPC on other nodes of the cluster.
  */
@@ -54,68 +51,60 @@ import static akka.pattern.Patterns.ask;
 public class RpcBroker extends AbstractUntypedActor {
 
     private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
-    private final Broker.ProviderSession brokerSession;
     private final ActorRef rpcRegistry;
-    private SchemaContext schemaContext;
     private final RemoteRpcProviderConfig config;
+    private final DOMRpcService rpcService;
 
-    private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
-            SchemaContext schemaContext) {
-        this.brokerSession = brokerSession;
+    private RpcBroker(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+        this.rpcService = rpcService;
         this.rpcRegistry = rpcRegistry;
-        this.schemaContext = schemaContext;
         config = new RemoteRpcProviderConfig(getContext().system().settings().config());
     }
 
-    public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
-            SchemaContext schemaContext) {
-        return Props.create(new RpcBrokerCreator(brokerSession, rpcRegistry, schemaContext));
+    public static Props props(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+        Preconditions.checkNotNull(rpcRegistry, "ActorRef can not be null!");
+        Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
+        return Props.create(new RpcBrokerCreator(rpcService, rpcRegistry));
     }
 
     @Override
-    protected void handleReceive(Object message) throws Exception {
+    protected void handleReceive(final Object message) throws Exception {
         if(message instanceof InvokeRpc) {
             invokeRemoteRpc((InvokeRpc) message);
         } else if(message instanceof ExecuteRpc) {
             executeRpc((ExecuteRpc) message);
-        } else if(message instanceof UpdateSchemaContext) {
-            updateSchemaContext((UpdateSchemaContext) message);
         }
     }
 
-    private void updateSchemaContext(UpdateSchemaContext message) {
-        this.schemaContext = message.getSchemaContext();
-    }
-
     private void invokeRemoteRpc(final InvokeRpc msg) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
         }
-        RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
+        final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
                 null, msg.getRpc(), msg.getIdentifier());
-        RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
+        final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
 
-        scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
+        final scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
 
         final ActorRef sender = getSender();
         final ActorRef self = self();
 
-        OnComplete<Object> onComplete = new OnComplete<Object>() {
+        final OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object reply) throws Throwable {
+            public void onComplete(final Throwable failure, final Object reply) throws Throwable {
                 if(failure != null) {
                     LOG.error("FindRouters failed", failure);
                     sender.tell(new akka.actor.Status.Failure(failure), self);
                     return;
                 }
 
-                RpcRegistry.Messages.FindRoutersReply findReply =
+                final RpcRegistry.Messages.FindRoutersReply findReply =
                                                 (RpcRegistry.Messages.FindRoutersReply)reply;
 
-                List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
+                final List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
 
                 if(actorRefList == null || actorRefList.isEmpty()) {
-                    String message = String.format(
+                    final String message = String.format(
                             "No remote implementation found for rpc %s",  msg.getRpc());
                     sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
                             message, Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
@@ -133,16 +122,16 @@ public class RpcBroker extends AbstractUntypedActor {
     protected void finishInvokeRpc(final List<Pair<ActorRef, Long>> actorRefList,
             final InvokeRpc msg, final ActorRef sender, final ActorRef self) {
 
-        RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
+        final RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
 
-        ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
-                schemaContext), msg.getRpc());
+        final Node serializedNode = NormalizedNodeSerializer.serialize(msg.getInput());
+        final ExecuteRpc executeMsg = new ExecuteRpc(serializedNode, msg.getRpc());
 
-        scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
+        final scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
 
-        OnComplete<Object> onComplete = new OnComplete<Object>() {
+        final OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object reply) throws Throwable {
+            public void onComplete(final Throwable failure, final Object reply) throws Throwable {
                 if(failure != null) {
                     LOG.error("ExecuteRpc failed", failure);
                     sender.tell(new akka.actor.Status.Failure(failure), self);
@@ -160,24 +149,22 @@ public class RpcBroker extends AbstractUntypedActor {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Executing rpc {}", msg.getRpc());
         }
-        Future<RpcResult<CompositeNode>> future = brokerSession.rpc(msg.getRpc(),
-                XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(),
-                        schemaContext));
+        final NormalizedNode<?, ?> input = NormalizedNodeSerializer.deSerialize(msg.getInputNormalizedNode());
+        final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> future = rpcService.invokeRpc(schemaPath, input);
 
-        ListenableFuture<RpcResult<CompositeNode>> listenableFuture =
+        final ListenableFuture<DOMRpcResult> listenableFuture =
                 JdkFutureAdapters.listenInPoolThread(future);
 
         final ActorRef sender = getSender();
         final ActorRef self = self();
 
-        Futures.addCallback(listenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+        Futures.addCallback(listenableFuture, new FutureCallback<DOMRpcResult>() {
             @Override
-            public void onSuccess(RpcResult<CompositeNode> result) {
-                if(result.isSuccessful()) {
-                    sender.tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result.getResult(),
-                            schemaContext)), self);
-                } else {
-                    String message = String.format("Execution of RPC %s failed",  msg.getRpc());
+            public void onSuccess(final DOMRpcResult result) {
+                if (result.getErrors() != null && ( ! result.getErrors().isEmpty())) {
+                    final String message = String.format("Execution of RPC %s failed",  msg.getRpc());
                     Collection<RpcError> errors = result.getErrors();
                     if(errors == null || errors.size() == 0) {
                         errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
@@ -186,11 +173,14 @@ public class RpcBroker extends AbstractUntypedActor {
 
                     sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
                             message, errors)), self);
+                } else {
+                    final Node serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
+                    sender.tell(new RpcResponse(serializedResultNode), self);
                 }
             }
 
             @Override
-            public void onFailure(Throwable t) {
+            public void onFailure(final Throwable t) {
                 LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
                 sender.tell(new akka.actor.Status.Failure(t), self);
             }
@@ -200,20 +190,17 @@ public class RpcBroker extends AbstractUntypedActor {
     private static class RpcBrokerCreator implements Creator<RpcBroker> {
         private static final long serialVersionUID = 1L;
 
-        final Broker.ProviderSession brokerSession;
+        final DOMRpcService rpcService;
         final ActorRef rpcRegistry;
-        final SchemaContext schemaContext;
 
-        RpcBrokerCreator(ProviderSession brokerSession, ActorRef rpcRegistry,
-                SchemaContext schemaContext) {
-            this.brokerSession = brokerSession;
+        RpcBrokerCreator(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+            this.rpcService = rpcService;
             this.rpcRegistry = rpcRegistry;
-            this.schemaContext = schemaContext;
         }
 
         @Override
         public RpcBroker create() throws Exception {
-            return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
+            return new RpcBroker(rpcService, rpcRegistry);
         }
     }
 }
index 22879dda2f903f6008c5a7c21a2a6447acca12bf..28ff1523cbb676c5bf47d6180d96c9e7f1304230 100644 (file)
@@ -10,46 +10,55 @@ package org.opendaylight.controller.remote.rpc;
 
 
 import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
-public class RpcListener implements RpcRegistrationListener{
+public class RpcListener implements DOMRpcAvailabilityListener{
 
   private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
   private final ActorRef rpcRegistry;
 
-  public RpcListener(ActorRef rpcRegistry) {
+  public RpcListener(final ActorRef rpcRegistry) {
     this.rpcRegistry = rpcRegistry;
   }
 
-  @Override
-  public void onRpcImplementationAdded(QName rpc) {
-    if(LOG.isDebugEnabled()) {
-        LOG.debug("Adding registration for [{}]", rpc);
+    @Override
+    public void onRpcAvailable(@Nonnull final Collection<DOMRpcIdentifier> rpcs) {
+        Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding registration for [{}]", rpcs);
+        }
+        final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
+
+        for (final DOMRpcIdentifier rpc : rpcs) {
+            final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null);
+            routeIds.add(routeId);
+        }
+        final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds);
+        rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
     }
-    RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
-    List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
-    routeIds.add(routeId);
-    RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds);
-    rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
-  }
 
-  @Override
-  public void onRpcImplementationRemoved(QName rpc) {
-    if(LOG.isDebugEnabled()) {
-        LOG.debug("Removing registration for [{}]", rpc);
+    @Override
+    public void onRpcUnavailable(@Nonnull final Collection<DOMRpcIdentifier> rpcs) {
+        Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Removing registration for [{}]", rpcs);
+        }
+        final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
+        for (final DOMRpcIdentifier rpc : rpcs) {
+            final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null);
+            routeIds.add(routeId);
+        }
+        final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds);
+        rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
     }
-    RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
-    List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
-    routeIds.add(routeId);
-    RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds);
-    rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
-  }
 }
index 4cbce63f9aa2a9b9f44f4c1ba3a27a225d4c2d42..f12fda0aa11a91f93bfffd2d6eb002ec76fb7b5e 100644 (file)
@@ -14,13 +14,17 @@ import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.japi.Function;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
-import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,98 +38,104 @@ import scala.concurrent.duration.Duration;
 
 public class RpcManager extends AbstractUntypedActor {
 
-  private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
-
-  private SchemaContext schemaContext;
-  private ActorRef rpcBroker;
-  private ActorRef rpcRegistry;
-  private final Broker.ProviderSession brokerSession;
-  private final RemoteRpcProviderConfig config;
-  private RpcListener rpcListener;
-  private RoutedRpcListener routeChangeListener;
-  private RemoteRpcImplementation rpcImplementation;
-  private final RpcProvisionRegistry rpcProvisionRegistry;
-
-  private RpcManager(SchemaContext schemaContext,
-                     Broker.ProviderSession brokerSession,
-                     RpcProvisionRegistry rpcProvisionRegistry) {
-    this.schemaContext = schemaContext;
-    this.brokerSession = brokerSession;
-    this.rpcProvisionRegistry = rpcProvisionRegistry;
-    this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
-
-    createRpcActors();
-    startListeners();
-  }
-
-
-    public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession,
-            final RpcProvisionRegistry rpcProvisionRegistry) {
-        return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry);
+    private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
+
+    private SchemaContext schemaContext;
+    private ActorRef rpcBroker;
+    private ActorRef rpcRegistry;
+    private final RemoteRpcProviderConfig config;
+    private RpcListener rpcListener;
+    private RoutedRpcListener routeChangeListener;
+    private RemoteRpcImplementation rpcImplementation;
+    private final DOMRpcProviderService rpcProvisionRegistry;
+    private final DOMRpcService rpcServices;
+
+    private RpcManager(final SchemaContext schemaContext,
+                       final DOMRpcProviderService rpcProvisionRegistry,
+                       final DOMRpcService rpcSevices) {
+        this.schemaContext = schemaContext;
+        this.rpcProvisionRegistry = rpcProvisionRegistry;
+        rpcServices = rpcSevices;
+        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+
+        createRpcActors();
+        startListeners();
     }
 
-  private void createRpcActors() {
-    LOG.debug("Create rpc registry and broker actors");
-
-    rpcRegistry =
-            getContext().actorOf(Props.create(RpcRegistry.class).
-                withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-
-    rpcBroker =
-            getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
-                withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
-
-    RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
-    rpcRegistry.tell(localRouter, self());
-  }
-
-  private void startListeners() {
-    LOG.debug("Registers rpc listeners");
-
-    rpcListener = new RpcListener(rpcRegistry);
-    routeChangeListener = new RoutedRpcListener(rpcRegistry);
-    rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
-
-    brokerSession.addRpcRegistrationListener(rpcListener);
-    rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
-    rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
-    announceSupportedRpcs();
-  }
-
-  /**
-   * Add all the locally registered RPCs in the clustered routing table
-   */
-  private void announceSupportedRpcs(){
-    LOG.debug("Adding all supported rpcs to routing table");
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      rpcListener.onRpcImplementationAdded(rpc);
+
+      public static Props props(final SchemaContext schemaContext,
+              final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) {
+          Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
+          Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
+          Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
+          return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices);
+      }
+
+    private void createRpcActors() {
+        LOG.debug("Create rpc registry and broker actors");
+
+        rpcRegistry =
+                getContext().actorOf(Props.create(RpcRegistry.class).
+                    withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+
+        rpcBroker =
+                getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry).
+                    withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+
+        final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
+        rpcRegistry.tell(localRouter, self());
     }
-  }
 
+    private void startListeners() {
+        LOG.debug("Registers rpc listeners");
+
+        rpcListener = new RpcListener(rpcRegistry);
+        routeChangeListener = new RoutedRpcListener(rpcRegistry);
+        rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
+
+        rpcServices.registerRpcListener(rpcListener);
+
+//        rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
+//        rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
+        announceSupportedRpcs();
+    }
 
-  @Override
-  protected void handleReceive(Object message) throws Exception {
-    if(message instanceof UpdateSchemaContext) {
-      updateSchemaContext((UpdateSchemaContext) message);
+    /**
+     * Add all the locally registered RPCs in the clustered routing table
+     */
+    private void announceSupportedRpcs(){
+        LOG.debug("Adding all supported rpcs to routing table");
+        final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
+        final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
+        for (final RpcDefinition rpcDef : currentlySupportedRpc) {
+            rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
+        }
+        rpcListener.onRpcAvailable(rpcs);
     }
 
-  }
 
-  private void updateSchemaContext(UpdateSchemaContext message) {
-    this.schemaContext = message.getSchemaContext();
-    rpcBroker.tell(message, ActorRef.noSender());
-  }
+    @Override
+    protected void handleReceive(final Object message) throws Exception {
+      if(message instanceof UpdateSchemaContext) {
+        updateSchemaContext((UpdateSchemaContext) message);
+      }
+
+    }
+
+    private void updateSchemaContext(final UpdateSchemaContext message) {
+      schemaContext = message.getSchemaContext();
+      rpcBroker.tell(message, ActorRef.noSender());
+    }
 
-  @Override
-  public SupervisorStrategy supervisorStrategy() {
-    return new OneForOneStrategy(10, Duration.create("1 minute"),
-        new Function<Throwable, SupervisorStrategy.Directive>() {
-          @Override
-          public SupervisorStrategy.Directive apply(Throwable t) {
-            return SupervisorStrategy.resume();
+    @Override
+    public SupervisorStrategy supervisorStrategy() {
+      return new OneForOneStrategy(10, Duration.create("1 minute"),
+          new Function<Throwable, SupervisorStrategy.Directive>() {
+            @Override
+            public SupervisorStrategy.Directive apply(final Throwable t) {
+              return SupervisorStrategy.resume();
+            }
           }
-        }
-    );
-  }
+      );
+    }
 }
index 5d780be641e2cce6d2b48a2628cd675f9a24c8ee..66c0c1b6f076ce1fdb8013a11370a5d33738f2a2 100644 (file)
@@ -10,24 +10,25 @@ package org.opendaylight.controller.remote.rpc.messages;
 
 import com.google.common.base.Preconditions;
 import java.io.Serializable;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.yangtools.yang.common.QName;
 
 public class ExecuteRpc implements Serializable {
     private static final long serialVersionUID = 1128904894827335676L;
 
-    private final String inputCompositeNode;
+    private final NormalizedNodeMessages.Node inputNormalizedNode;
     private final QName rpc;
 
-    public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
-        Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
+    public ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) {
+        Preconditions.checkNotNull(inputNormalizedNode, "Normalized Node input string should be present");
         Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
 
-        this.inputCompositeNode = inputCompositeNode;
+        this.inputNormalizedNode = inputNormalizedNode;
         this.rpc = rpc;
     }
 
-    public String getInputCompositeNode() {
-        return inputCompositeNode;
+    public NormalizedNodeMessages.Node getInputNormalizedNode() {
+        return inputNormalizedNode;
     }
 
     public QName getRpc() {
index 9c40dbfc58a556bc28f337c9ee683b947846c045..a7fbe8305e76d0b520bb56c1d13176883462840f 100644 (file)
@@ -10,17 +10,17 @@ package org.opendaylight.controller.remote.rpc.messages;
 import com.google.common.base.Preconditions;
 import java.io.Serializable;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class InvokeRpc implements Serializable {
     private static final long serialVersionUID = -2813459607858108953L;
 
     private final QName rpc;
     private final YangInstanceIdentifier identifier;
-    private final CompositeNode input;
+    private final NormalizedNode<?,?> input;
 
-    public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
+    public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final NormalizedNode<?,?> input) {
         Preconditions.checkNotNull(rpc, "rpc qname should not be null");
         Preconditions.checkNotNull(input, "rpc input should not be null");
 
@@ -37,7 +37,7 @@ public class InvokeRpc implements Serializable {
         return identifier;
     }
 
-    public CompositeNode getInput() {
+    public NormalizedNode<?,?> getInput() {
         return input;
     }
 }
index e6b208cb6fe63d64eca185b9d90d2d19144ed440..e5cb488d97183beeaea3dba4d440d42b44f02b03 100644 (file)
@@ -8,17 +8,18 @@
 package org.opendaylight.controller.remote.rpc.messages;
 
 import java.io.Serializable;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 
 public class RpcResponse implements Serializable {
     private static final long serialVersionUID = -4211279498688989245L;
 
-    private final String resultCompositeNode;
+    private final NormalizedNodeMessages.Node resultNormalizedNode;
 
-    public RpcResponse(final String resultCompositeNode) {
-        this.resultCompositeNode = resultCompositeNode;
+    public RpcResponse(final NormalizedNodeMessages.Node inputNormalizedNode) {
+        resultNormalizedNode = inputNormalizedNode;
     }
 
-    public String getResultCompositeNode() {
-        return resultCompositeNode;
+    public NormalizedNodeMessages.Node getResultNormalizedNode() {
+        return resultNormalizedNode;
     }
 }
index 46406fd4feebad58c7546ec5df12e5bb1f2f8137..afe81a880068125d23e8d08d5089913a678db7fd 100644 (file)
@@ -8,38 +8,28 @@
 
 package org.opendaylight.controller.remote.rpc;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.mockito.Mockito;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
-import java.io.File;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Base class for RPC tests.
  *
@@ -64,11 +54,12 @@ public class AbstractRpcTest {
     protected JavaTestKit probeReg2;
     protected Broker.ProviderSession brokerSession;
     protected SchemaContext schemaContext;
+    protected DOMRpcService rpcService;
 
     @BeforeClass
     public static void setup() throws InterruptedException {
-        RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
-        RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+        final RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+        final RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
         node1 = ActorSystem.create("opendaylight-rpc", config1.get());
         node2 = ActorSystem.create("opendaylight-rpc", config2.get());
     }
@@ -87,16 +78,18 @@ public class AbstractRpcTest {
                 new File(RpcBrokerTest.class.getResource("/test-rpc.yang").getPath())));
 
         brokerSession = Mockito.mock(Broker.ProviderSession.class);
+        rpcService = Mockito.mock(DOMRpcService.class);
+
         probeReg1 = new JavaTestKit(node1);
-        rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext));
+        rpcBroker1 = node1.actorOf(RpcBroker.props(rpcService, probeReg1.getRef()));
         probeReg2 = new JavaTestKit(node2);
-        rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext));
+        rpcBroker2 = node2.actorOf(RpcBroker.props(rpcService, probeReg2.getRef()));
 
     }
 
-    static void assertRpcErrorEquals(RpcError rpcError, ErrorSeverity severity,
-            ErrorType errorType, String tag, String message, String applicationTag, String info,
-            String causeMsg) {
+    static void assertRpcErrorEquals(final RpcError rpcError, final ErrorSeverity severity,
+            final ErrorType errorType, final String tag, final String message, final String applicationTag, final String info,
+            final String causeMsg) {
         assertEquals("getSeverity", severity, rpcError.getSeverity());
         assertEquals("getErrorType", errorType, rpcError.getErrorType());
         assertEquals("getTag", tag, rpcError.getTag());
@@ -111,57 +104,6 @@ public class AbstractRpcTest {
         }
     }
 
-    static void assertCompositeNodeEquals(CompositeNode exp, CompositeNode actual) {
-        assertEquals("NodeType getNamespace", exp.getNodeType().getNamespace(),
-                actual.getNodeType().getNamespace());
-        assertEquals("NodeType getLocalName", exp.getNodeType().getLocalName(),
-                actual.getNodeType().getLocalName());
-        for(Node<?> child: exp.getValue()) {
-            List<Node<?>> c = actual.get(child.getNodeType());
-            assertNotNull("Missing expected child " + child.getNodeType(), c);
-            if(child instanceof CompositeNode) {
-                assertCompositeNodeEquals((CompositeNode) child, (CompositeNode)c.get(0));
-            } else {
-                assertEquals("Value for Node " + child.getNodeType(), child.getValue(),
-                        c.get(0).getValue());
-            }
-        }
-    }
-
-    static CompositeNode makeRPCInput(String data) {
-        CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder()
-                .setQName(TEST_RPC_INPUT).addLeaf(TEST_RPC_INPUT_DATA, data);
-        return ImmutableCompositeNode.create(
-                TEST_RPC, ImmutableList.<Node<?>>of(builder.toInstance()));
-    }
-
-    static CompositeNode makeRPCOutput(String data) {
-        CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder()
-                .setQName(TEST_RPC_OUTPUT).addLeaf(TEST_RPC_OUTPUT_DATA, data);
-        return ImmutableCompositeNode.create(
-                TEST_RPC, ImmutableList.<Node<?>>of(builder.toInstance()));
-    }
-
-    static void assertFailedRpcResult(RpcResult<CompositeNode> rpcResult, ErrorSeverity severity,
-            ErrorType errorType, String tag, String message, String applicationTag, String info,
-            String causeMsg) {
-
-        assertNotNull("RpcResult was null", rpcResult);
-        assertEquals("isSuccessful", false, rpcResult.isSuccessful());
-        Collection<RpcError> rpcErrors = rpcResult.getErrors();
-        assertEquals("RpcErrors count", 1, rpcErrors.size());
-        assertRpcErrorEquals(rpcErrors.iterator().next(), severity, errorType, tag, message,
-                applicationTag, info, causeMsg);
-    }
-
-    static void assertSuccessfulRpcResult(RpcResult<CompositeNode> rpcResult,
-            CompositeNode expOutput) {
-
-        assertNotNull("RpcResult was null", rpcResult);
-        assertEquals("isSuccessful", true, rpcResult.isSuccessful());
-        assertCompositeNodeEquals(expOutput, rpcResult.getResult());
-    }
-
     static class TestException extends Exception {
         private static final long serialVersionUID = 1L;
 
index 49451dd0db99114141c289750cf37f7b12e036c1..2026d48a81d3a3a201f131d3c20e97f6562c363b 100644 (file)
@@ -8,26 +8,6 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
-import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.xml.codec.XmlUtils;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
 
 /***
  * Unit tests for RemoteRpcImplementation.
@@ -36,151 +16,6 @@ import static org.junit.Assert.assertEquals;
  */
 public class RemoteRpcImplementationTest extends AbstractRpcTest {
 
-    @Test
-    public void testInvokeRpc() throws Exception {
-        final AtomicReference<AssertionError> assertError = new AtomicReference<>();
-        try {
-            RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext, getConfig());
-
-            final CompositeNode input = makeRPCInput("foo");
-            final CompositeNode output = makeRPCOutput("bar");
-            final AtomicReference<InvokeRpc> invokeRpcMsg = setupInvokeRpcReply(assertError, output);
-
-            ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
-
-            RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
-
-            assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0));
-
-            assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc());
-            assertEquals("getInput", input, invokeRpcMsg.get().getInput());
-        } finally {
-            if(assertError.get() != null) {
-                throw assertError.get();
-            }
-        }
-    }
-
-    @Test
-    public void testInvokeRpcWithIdentifier() throws Exception {
-        final AtomicReference<AssertionError> assertError = new AtomicReference<>();
-        try {
-            RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext, getConfig());
-
-            QName instanceQName = new QName(new URI("ns"), "instance");
-            YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName);
-
-            CompositeNode input = makeRPCInput("foo");
-            CompositeNode output = makeRPCOutput("bar");
-            final AtomicReference<InvokeRpc> invokeRpcMsg = setupInvokeRpcReply(assertError, output);
-
-            ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(
-                    TEST_RPC, identifier, input);
-
-            RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
-
-            assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0));
-
-            assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc());
-            assertEquals("getInput", input, invokeRpcMsg.get().getInput());
-            assertEquals("getRoute", identifier, invokeRpcMsg.get().getIdentifier());
-        } finally {
-            if(assertError.get() != null) {
-                throw assertError.get();
-            }
-        }
-    }
-
-    @Test
-    public void testInvokeRpcWithRpcErrorsException() throws Exception {
-        final AtomicReference<AssertionError> assertError = new AtomicReference<>();
-        try {
-            RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext, getConfig());
-
-            final CompositeNode input = makeRPCInput("foo");
-
-            setupInvokeRpcErrorReply(assertError, new RpcErrorsException(
-                    "mock", Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, "tag",
-                            "error", "appTag", "info", null))));
-
-            ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
-
-            RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
-
-            assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "tag",
-                    "error", "appTag", "info", null);
-        } finally {
-            if(assertError.get() != null) {
-                throw assertError.get();
-            }
-        }
-    }
-
-    @Test
-    public void testInvokeRpcWithOtherException() throws Exception {
-        final AtomicReference<AssertionError> assertError = new AtomicReference<>();
-        try {
-            RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext, getConfig());
-
-            final CompositeNode input = makeRPCInput("foo");
-
-            setupInvokeRpcErrorReply(assertError, new TestException());
-
-            ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
-
-            RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
-
-            assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "operation-failed",
-                    TestException.MESSAGE, null, null, TestException.MESSAGE);
-        } finally {
-            if(assertError.get() != null) {
-                throw assertError.get();
-            }
-        }
-    }
-
-    private AtomicReference<InvokeRpc> setupInvokeRpcReply(
-            final AtomicReference<AssertionError> assertError, final CompositeNode output) {
-        return setupInvokeRpcReply(assertError, output, null);
-    }
-
-    private AtomicReference<InvokeRpc> setupInvokeRpcErrorReply(
-            final AtomicReference<AssertionError> assertError, final Exception error) {
-        return setupInvokeRpcReply(assertError, null, error);
-    }
-
-    private AtomicReference<InvokeRpc> setupInvokeRpcReply(
-            final AtomicReference<AssertionError> assertError, final CompositeNode output,
-            final Exception error) {
-        final AtomicReference<InvokeRpc> invokeRpcMsg = new AtomicReference<>();
-
-        new Thread() {
-            @Override
-            public void run() {
-                try {
-                    invokeRpcMsg.set(probeReg1.expectMsgClass(
-                            JavaTestKit.duration("5 seconds"), InvokeRpc.class));
-
-                    if(output != null) {
-                        probeReg1.reply(new RpcResponse(XmlUtils.outputCompositeNodeToXml(
-                                output, schemaContext)));
-                    } else {
-                        probeReg1.reply(new akka.actor.Status.Failure(error));
-                    }
-
-                } catch(AssertionError e) {
-                    assertError.set(e);
-                }
-            }
-
-        }.start();
-
-        return invokeRpcMsg;
-    }
 
     private RemoteRpcProviderConfig getConfig(){
         return new RemoteRpcProviderConfig.Builder("unit-test").build();
index 8b4599ca8ceac01684376b63be3dded36e354e2b..5cd3df3a245d0d645059f26637d68b6587fc3af0 100644 (file)
 package org.opendaylight.controller.remote.rpc;
 
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.Config;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Test;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
-import org.opendaylight.controller.sal.core.api.model.SchemaService;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.Await;
-import scala.concurrent.duration.Duration;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class RemoteRpcProviderTest {
 
@@ -38,7 +24,7 @@ public class RemoteRpcProviderTest {
   @BeforeClass
   public static void setup() throws InterruptedException {
     moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
-    Config config = moduleConfig.get();
+    final Config config = moduleConfig.get();
     system = ActorSystem.create("odl-cluster-rpc", config);
 
   }
@@ -49,21 +35,4 @@ public class RemoteRpcProviderTest {
     system = null;
   }
 
-  @Test
-  public void testRemoteRpcProvider() throws Exception {
-    RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(RpcProvisionRegistry.class));
-    Broker.ProviderSession session = mock(Broker.ProviderSession.class);
-    SchemaService schemaService = mock(SchemaService.class);
-    when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class));
-    when(session.getService(SchemaService.class)).thenReturn(schemaService);
-
-    rpcProvider.onSessionInitiated(session);
-
-    ActorRef actorRef = Await.result(
-            system.actorSelection(
-                    moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
-                                                                 Duration.create(2, TimeUnit.SECONDS));
-
-    Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
-  }
 }
index 28b1b476cd0ebc4d20585b58a91877c076caf33c..16b13910a8f91539f0ec4a085fe589485737d6b0 100644 (file)
@@ -9,269 +9,8 @@
 package org.opendaylight.controller.remote.rpc;
 
 
-import akka.actor.ActorRef;
-import akka.japi.Pair;
-import akka.testkit.JavaTestKit;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import static org.junit.Assert.assertEquals;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
-import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import org.opendaylight.controller.xml.codec.XmlUtils;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.any;
 
 public class RpcBrokerTest extends AbstractRpcTest {
 
-    @Test
-    public void testInvokeRpcWithNoRemoteActor() throws Exception {
-        new JavaTestKit(node1) {{
-            CompositeNode input = makeRPCInput("foo");
-
-            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, input);
-            rpcBroker1.tell(invokeMsg, getRef());
-
-            probeReg1.expectMsgClass(duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
-            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
-                    Collections.<Pair<ActorRef, Long>>emptyList()));
-
-            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
-                    akka.actor.Status.Failure.class);
-
-            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
-        }};
-    }
-
-
-    /**
-     * This test method invokes and executes the remote rpc
-     */
-    //@Test
-    public void testInvokeRpc() throws URISyntaxException {
-        new JavaTestKit(node1) {{
-            QName instanceQName = new QName(new URI("ns"), "instance");
-
-            CompositeNode invokeRpcResult = makeRPCOutput("bar");
-            RpcResult<CompositeNode> rpcResult =
-                               RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
-            ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
-            when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
-                    .thenReturn(Futures.immediateFuture(rpcResult));
-
-            // invoke rpc
-            CompositeNode input = makeRPCInput("foo");
-            YangInstanceIdentifier instanceID = YangInstanceIdentifier.of(instanceQName);
-            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, instanceID, input);
-            rpcBroker1.tell(invokeMsg, getRef());
-
-            FindRouters findRouters = probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-            RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-            assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-            assertEquals("getRoute", instanceID, routeIdentifier.getRoute());
-
-            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
-                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
-
-            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
-            assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
-                    XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
-            assertCompositeNodeEquals(input, inputCaptor.getValue());
-        }};
-    }
-
-    @Test
-    public void testInvokeRpcWithNoOutput() {
-        new JavaTestKit(node1) {{
-
-            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>success().build();
-            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
-                    .thenReturn(Futures.immediateFuture(rpcResult));
-
-            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
-            rpcBroker1.tell(invokeMsg, getRef());
-
-            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
-                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
-
-            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
-
-            assertEquals("getResultCompositeNode", "", rpcResponse.getResultCompositeNode());
-        }};
-    }
-
-    @Test
-    public void testInvokeRpcWithExecuteFailure() {
-        new JavaTestKit(node1) {{
-
-            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
-                    .withError(ErrorType.RPC, "tag", "error", "appTag", "info",
-                            new Exception("mock"))
-                    .build();
-            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
-                    .thenReturn(Futures.immediateFuture(rpcResult));
-
-            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
-            rpcBroker1.tell(invokeMsg, getRef());
-
-            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
-                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
-
-            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
-                    akka.actor.Status.Failure.class);
-
-            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
-
-            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
-            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
-            assertEquals("RpcErrors count", 1, rpcErrors.size());
-            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag",
-                    "error", "appTag", "info", "mock");
-        }};
-    }
-
-    @Test
-    public void testInvokeRpcWithFindRoutersFailure() {
-        new JavaTestKit(node1) {{
-
-            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
-            rpcBroker1.tell(invokeMsg, getRef());
-
-            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-            probeReg1.reply(new akka.actor.Status.Failure(new TestException()));
-
-            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
-                    akka.actor.Status.Failure.class);
-
-            assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
-        }};
-    }
-
-    @Test
-    public void testExecuteRpc() {
-        new JavaTestKit(node1) {{
-
-            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
-
-            CompositeNode invokeRpcResult = makeRPCOutput("bar");
-            RpcResult<CompositeNode> rpcResult =
-                               RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
-            ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
-            when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
-                    .thenReturn(Futures.immediateFuture(rpcResult));
-
-            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
-
-            rpcBroker1.tell(executeMsg, getRef());
-
-            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
-
-            assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
-                    XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
-        }};
-    }
-
-    @Test
-    public void testExecuteRpcFailureWithRpcErrors() {
-        new JavaTestKit(node1) {{
-
-            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
-
-            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
-                    .withError(ErrorType.RPC, "tag1", "error", "appTag1", "info1",
-                            new Exception("mock"))
-                    .withWarning(ErrorType.PROTOCOL, "tag2", "warning", "appTag2", "info2", null)
-                    .build();
-            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
-                    .thenReturn(Futures.immediateFuture(rpcResult));
-
-            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
-
-            rpcBroker1.tell(executeMsg, getRef());
-
-            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
-                    akka.actor.Status.Failure.class);
-
-            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
-
-            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
-            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
-            assertEquals("RpcErrors count", 2, rpcErrors.size());
-            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag1",
-                    "error", "appTag1", "info1", "mock");
-            assertRpcErrorEquals(rpcErrors.get(1), ErrorSeverity.WARNING, ErrorType.PROTOCOL, "tag2",
-                    "warning", "appTag2", "info2", null);
-        }};
-    }
-
-    @Test
-    public void testExecuteRpcFailureWithNoRpcErrors() {
-        new JavaTestKit(node1) {{
-
-            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
-
-            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed().build();
-            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
-                    .thenReturn(Futures.immediateFuture(rpcResult));
-
-            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
-
-            rpcBroker1.tell(executeMsg, getRef());
-
-            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
-                    akka.actor.Status.Failure.class);
-
-            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
-
-            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
-            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
-            assertEquals("RpcErrors count", 1, rpcErrors.size());
-            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC,
-                    "operation-failed", "failed", null, null, null);
-        }};
-    }
-
-    @Test
-    public void testExecuteRpcFailureWithException() {
-        new JavaTestKit(node1) {{
-
-            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
-
-            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
-                    .thenReturn(Futures.<RpcResult<CompositeNode>>immediateFailedFuture(
-                            new TestException()));
-
-            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
-
-            rpcBroker1.tell(executeMsg, getRef());
-
-            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
-                    akka.actor.Status.Failure.class);
 
-            assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
-        }};
-    }
 }
index 956e1599904ccc52303ff380892bc3531861cd5b..ecfaef841979421553297616d3c5ec84fd0c6ade 100644 (file)
@@ -8,18 +8,11 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.ConfigFactory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.yangtools.yang.common.QName;
-
-import java.net.URI;
-import java.net.URISyntaxException;
 
 public class RpcListenerTest {
 
@@ -37,37 +30,4 @@ public class RpcListenerTest {
     system = null;
   }
 
-  @Test
-  public void testRpcAdd() throws URISyntaxException {
-    new JavaTestKit(system) {
-      {
-        JavaTestKit probeReg = new JavaTestKit(system);
-        ActorRef rpcRegistry = probeReg.getRef();
-
-        RpcListener rpcListener = new RpcListener(rpcRegistry);
-
-        QName qName = new QName(new URI("actor2"), "actor2");
-
-        rpcListener.onRpcImplementationAdded(qName);
-        probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class);
-      }};
-
-  }
-
-  @Test
-  public void testRpcRemove() throws URISyntaxException {
-    new JavaTestKit(system) {
-      {
-        JavaTestKit probeReg = new JavaTestKit(system);
-        ActorRef rpcRegistry = probeReg.getRef();
-
-        RpcListener rpcListener = new RpcListener(rpcRegistry);
-
-        QName qName = new QName(new URI("actor2"), "actor2");
-
-        rpcListener.onRpcImplementationRemoved(qName);
-        probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class);
-      }};
-
-  }
 }