Enhancements to remote rpc server. Using zmq router-dealer bridge to make the server...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ClientImpl.java
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java
new file mode 100644 (file)
index 0000000..291fe0b
--- /dev/null
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * An implementation of {@link RpcImplementation} that makes
+ * remote RPC calls
+ */
+public class ClientImpl implements RemoteRpcClient {
+
+  private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
+
+  private ZMQ.Context context = ZMQ.context(1);
+  private ClientRequestHandler handler;
+  private RoutingTableProvider routingTableProvider;
+
+  public ClientImpl(){
+    handler = new ClientRequestHandler(context);
+    start();
+  }
+
+  public ClientImpl(ClientRequestHandler handler){
+    this.handler = handler;
+    start();
+  }
+
+  public RoutingTableProvider getRoutingTableProvider() {
+    return routingTableProvider;
+  }
+
+  public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
+    this.routingTableProvider = routingTableProvider;
+  }
+
+ @Override
+  public Set<QName> getSupportedRpcs(){
+    //TODO: Find the entries from routing table
+    return Collections.emptySet();
+  }
+
+  @Override
+  public void start() {/*NOOPS*/}
+
+  @Override
+  public void stop() {
+    closeZmqContext();
+    handler.close();
+    _logger.info("Stopped");
+  }
+
+  @Override
+  public void close(){
+    stop();
+  }
+
+  /**
+   * Finds remote server that can execute this rpc and sends a message to it
+   * requesting execution.
+   * The call blocks until a response from remote server is received. Its upto
+   * the client of this API to implement a timeout functionality.
+   *
+   * @param rpc   remote service to be executed
+   * @param input payload for the remote service
+   * @return
+   */
+  @Override
+  public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+    routeId.setType(rpc);
+
+    String address = lookupRemoteAddress(routeId);
+
+    Message request = new Message.MessageBuilder()
+        .type(Message.MessageType.REQUEST)
+        .sender(Context.getInstance().getLocalUri())
+        .recipient(address)
+        .route(routeId)
+        .payload(XmlUtils.compositeNodeToXml(input))
+        .build();
+
+    List<RpcError> errors = new ArrayList<RpcError>();
+
+    try{
+      Message response = handler.handle(request);
+      CompositeNode payload = null;
+
+      if ( response != null )
+        payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+
+      return Rpcs.getRpcResult(true, payload, errors);
+
+    } catch (Exception e){
+      collectErrors(e, errors);
+      return Rpcs.getRpcResult(false, null, errors);
+    }
+
+  }
+
+  /**
+   * Find address for the given route identifier in routing table
+   * @param  routeId route identifier
+   * @return         remote network address
+   */
+  private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){
+    checkNotNull(routeId, "route must not be null");
+
+    Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
+    checkNotNull(routingTable.isPresent(), "Routing table is null");
+
+    Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
+    checkNotNull(addresses, "Address not found for route [%s]", routeId);
+    checkState(addresses.size() == 1,
+        "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service.
+
+    String address = addresses.iterator().next();
+    checkNotNull(address, "Address not found for route [%s]", routeId);
+
+    return address;
+  }
+
+  private void collectErrors(Exception e, List<RpcError> errors){
+    if (e == null) return;
+    if (errors == null) errors = new ArrayList<RpcError>();
+
+    errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+    for (Throwable t : e.getSuppressed()) {
+      errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
+    }
+  }
+
+  /**
+   * Closes ZMQ Context. It tries to gracefully terminate the context. If
+   * termination takes more than a second, its forcefully shutdown.
+   */
+  private void closeZmqContext() {
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    FutureTask zmqTermination = new FutureTask(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          if (context != null)
+            context.term();
+          _logger.debug("ZMQ Context terminated");
+        } catch (Exception e) {
+          _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
+        }
+      }
+    }, null);
+
+    exec.execute(zmqTermination);
+
+    try {
+      zmqTermination.get(1L, TimeUnit.SECONDS);
+    } catch (Exception e) {/*ignore and continue with shutdown*/}
+
+    exec.shutdownNow();
+  }
+}