Enhancements to remote rpc server. Using zmq router-dealer bridge to make the server...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerRequestHandler.java
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java
new file mode 100644 (file)
index 0000000..949e6ee
--- /dev/null
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ */
+public class ServerRequestHandler implements AutoCloseable{
+
+  private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
+  private final String DEFAULT_NAME = "remote-rpc-worker";
+  private String dealerAddress;
+  private String serverAddress;
+  private int workerCount;
+  private ZMQ.Context context;
+  private Broker.ProviderSession broker;
+
+  private RequestHandlerThreadPool workerPool;
+  private final AtomicInteger threadId = new AtomicInteger();
+
+  public ServerRequestHandler(ZMQ.Context context,
+                              Broker.ProviderSession session,
+                              int workerCount,
+                              String dealerAddress,
+                              String serverAddress) {
+    this.context       = context;
+    this.dealerAddress = dealerAddress;
+    this.serverAddress = serverAddress;
+    this.broker        = session;
+    this.workerCount   = workerCount;
+  }
+
+  public ThreadPoolExecutor getWorkerPool(){
+    return workerPool;
+  }
+
+  public void start(){
+    workerPool = new RequestHandlerThreadPool(
+        workerCount, workerCount,
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>());
+    //unbound is ok. Task will never be submitted
+
+    for (int i=0;i<workerCount;i++){
+      workerPool.execute(new Worker(threadId.incrementAndGet()));
+    }
+  }
+
+  /**
+   * This gets called automatically if used with try-with-resources
+   * @throws Exception
+   */
+  @Override
+  public void close() throws Exception {
+    if (workerPool != null)
+      workerPool.shutdown();
+    _logger.info("Request Handler closed");
+  }
+
+  /**
+   * Worker to handles RPC request
+   */
+  private class Worker implements Runnable {
+    private String name;
+
+    public Worker(int id){
+      this.name = DEFAULT_NAME + "-" + id;
+    }
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName(name);
+      _logger.debug("Starting... ");
+      ZMQ.Socket socket = null;
+
+      try {
+        socket = context.socket(ZMQ.REP);
+        socket.connect(dealerAddress);
+
+        while (!Thread.currentThread().isInterrupted()) {
+
+          Message request = parseMessage(socket);
+          _logger.debug("Received rpc request [{}]", request);
+
+          if (request != null) {
+            // Call broker to process the message then reply
+            Future<RpcResult<CompositeNode>> rpc = null;
+            RpcResult<CompositeNode> result = null;
+
+            //TODO Call this in a new thread with timeout
+            try {
+              rpc = broker.rpc(
+                  (QName) request.getRoute().getType(),
+                  XmlUtils.xmlToCompositeNode((String) request.getPayload()));
+
+              result = (rpc != null) ? rpc.get() : null;
+
+            } catch (Exception e) {
+              _logger.debug("Broker threw  [{}]", e);
+            }
+
+            CompositeNode payload = (result != null) ? result.getResult() : null;
+
+            Message response = new Message.MessageBuilder()
+                .type(Message.MessageType.RESPONSE)
+                .sender(serverAddress)
+                .route(request.getRoute())
+                .payload(XmlUtils.compositeNodeToXml(payload))
+                .build();
+
+            _logger.debug("Sending rpc response [{}]", response);
+
+            try {
+              socket.send(Message.serialize(response));
+            } catch (Exception e) {
+              _logger.debug("rpc response send failed for message [{}]", response);
+              _logger.debug("{}", e);
+            }
+          }
+        }
+      } catch (Exception e) {
+        printException(e);
+      } finally {
+        closeSocket(socket);
+      }
+    }
+
+    /**
+     * @param socket
+     * @return
+     */
+    private Message parseMessage(ZMQ.Socket socket) throws Exception {
+      byte[] bytes = socket.recv(); //this blocks
+      _logger.debug("Received bytes:[{}]", bytes.length);
+      return (Message) Message.deserialize(bytes);
+    }
+
+    private void printException(Exception e) {
+      try (StringWriter s = new StringWriter();
+           PrintWriter p = new PrintWriter(s)) {
+        e.printStackTrace(p);
+        _logger.debug(s.toString());
+      } catch (IOException e1) {/*Ignore and continue*/ }
+    }
+
+    private void closeSocket(ZMQ.Socket socket) {
+      try {
+        if (socket != null) socket.close();
+      } catch (Exception x) {
+        _logger.debug("Exception while closing socket [{}]", x);
+      } finally {
+        if (socket != null) socket.close();
+      }
+      _logger.debug("Closing...");
+    }
+  }
+
+
+  /**
+   *
+   */
+  public class RequestHandlerThreadPool extends ThreadPoolExecutor{
+
+    public RequestHandlerThreadPool(int corePoolSize,
+                                    int maximumPoolSize,
+                                    long keepAliveTime,
+                                    TimeUnit unit,
+                                    BlockingQueue<Runnable> workQueue) {
+      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+      if (isTerminating() || isTerminated() || isShutdown())
+        return;
+
+      if ( t != null ){
+        _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage());
+      }
+
+      this.execute(new Worker(threadId.incrementAndGet()));
+      super.afterExecute(r, null);
+    }
+  }
+}