Merge "BUG 484 Anyxml normalized node"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ClientImpl.java
index 291fe0b8e73a5a52e65f1d6e9be4fcca31290a50..200ebaee6a5a0fc2b34b7d97c52ce2350b8ee3a1 100644 (file)
@@ -7,45 +7,48 @@
 
 package org.opendaylight.controller.sal.connector.remoterpc;
 
-import com.google.common.base.Optional;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
 import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
- * An implementation of {@link RpcImplementation} that makes
+ * An implementation of {@link org.opendaylight.controller.sal.core.api.RpcImplementation} that makes
  * remote RPC calls
  */
 public class ClientImpl implements RemoteRpcClient {
 
   private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
 
-  private ZMQ.Context context = ZMQ.context(1);
-  private ClientRequestHandler handler;
+  private final ZMQ.Context context = ZMQ.context(1);
+  private final ClientRequestHandler handler;
   private RoutingTableProvider routingTableProvider;
 
   public ClientImpl(){
@@ -62,16 +65,11 @@ public class ClientImpl implements RemoteRpcClient {
     return routingTableProvider;
   }
 
+  @Override
   public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
     this.routingTableProvider = routingTableProvider;
   }
 
- @Override
-  public Set<QName> getSupportedRpcs(){
-    //TODO: Find the entries from routing table
-    return Collections.emptySet();
-  }
-
   @Override
   public void start() {/*NOOPS*/}
 
@@ -97,14 +95,40 @@ public class ClientImpl implements RemoteRpcClient {
    * @param input payload for the remote service
    * @return
    */
-  @Override
-  public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+    routeId.setType(rpc);
+
+    String address = lookupRemoteAddressForGlobalRpc(routeId);
+    return sendMessage(input, routeId, address);
+  }
+
+  /**
+   * Finds remote server that can execute this routed rpc and sends a message to it
+   * requesting execution.
+   * The call blocks until a response from remote server is received. Its upto
+   * the client of this API to implement a timeout functionality.
+   *
+   * @param rpc
+   *          rpc to be called
+   * @param identifier
+   *          instance identifier on which rpc is to be executed
+   * @param input
+   *          payload
+   * @return
+   */
+  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
 
     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
     routeId.setType(rpc);
+    routeId.setRoute(identifier);
 
-    String address = lookupRemoteAddress(routeId);
+    String address = lookupRemoteAddressForRpc(routeId);
 
+    return sendMessage(input, routeId, address);
+  }
+
+  private ListenableFuture<RpcResult<CompositeNode>> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
     Message request = new Message.MessageBuilder()
         .type(Message.MessageType.REQUEST)
         .sender(Context.getInstance().getLocalUri())
@@ -119,16 +143,35 @@ public class ClientImpl implements RemoteRpcClient {
       Message response = handler.handle(request);
       CompositeNode payload = null;
 
-      if ( response != null )
-        payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+      if ( response != null ) {
+
+        _logger.info("Received response [{}]", response);
+
+        Object rawPayload = response.getPayload();
+        switch (response.getType()) {
+          case ERROR:
+            if ( rawPayload instanceof List )
+              errors = (List) rawPayload;
+              break;
+
+          case RESPONSE:
+            payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
+            break;
 
-      return Rpcs.getRpcResult(true, payload, errors);
+          default:
+            errors.add(
+                RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
+            );
+            break;
+
+        }
+      }
+      return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors));
 
     } catch (Exception e){
       collectErrors(e, errors);
-      return Rpcs.getRpcResult(false, null, errors);
+      return Futures.immediateFuture(Rpcs.<CompositeNode>getRpcResult(false, null, errors));
     }
-
   }
 
   /**
@@ -136,19 +179,36 @@ public class ClientImpl implements RemoteRpcClient {
    * @param  routeId route identifier
    * @return         remote network address
    */
-  private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){
+  private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
     checkNotNull(routeId, "route must not be null");
 
-    Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
+    Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
     checkNotNull(routingTable.isPresent(), "Routing table is null");
 
-    Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
-    checkNotNull(addresses, "Address not found for route [%s]", routeId);
-    checkState(addresses.size() == 1,
-        "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service.
+    String address = null;
+    try {
+      address = routingTable.get().getGlobalRoute(routeId);
+    } catch (RoutingTableException|SystemException e) {
+      _logger.error("Exception caught while looking up remote address " + e);
+    }
+    checkState(address != null, "Address not found for route [%s]", routeId);
+
+    return address;
+  }
+
+  /**
+   * Find address for the given route identifier in routing table
+   * @param  routeId route identifier
+   * @return         remote network address
+   */
+  private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+    checkNotNull(routeId, "route must not be null");
+
+    Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
+    checkNotNull(routingTable.isPresent(), "Routing table is null");
 
-    String address = addresses.iterator().next();
-    checkNotNull(address, "Address not found for route [%s]", routeId);
+    String address = routingTable.get().getLastAddedRoute(routeId);
+    checkState(address != null, "Address not found for route [%s]", routeId);
 
     return address;
   }
@@ -169,7 +229,7 @@ public class ClientImpl implements RemoteRpcClient {
    */
   private void closeZmqContext() {
     ExecutorService exec = Executors.newSingleThreadExecutor();
-    FutureTask zmqTermination = new FutureTask(new Runnable() {
+    FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
 
       @Override
       public void run() {