2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.sal.connector.remoterpc;
11 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
12 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
13 import org.opendaylight.controller.sal.core.api.Broker;
14 import org.opendaylight.yangtools.yang.common.QName;
15 import org.opendaylight.yangtools.yang.common.RpcResult;
16 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 import org.zeromq.ZMQ;
21 import java.io.IOException;
22 import java.io.PrintWriter;
23 import java.io.StringWriter;
24 import java.util.concurrent.*;
25 import java.util.concurrent.atomic.AtomicInteger;
30 public class ServerRequestHandler implements AutoCloseable{
32 private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
33 private final String DEFAULT_NAME = "remote-rpc-worker";
34 private String dealerAddress;
35 private String serverAddress;
36 private int workerCount;
37 private ZMQ.Context context;
38 private Broker.ProviderSession broker;
40 private RequestHandlerThreadPool workerPool;
41 private final AtomicInteger threadId = new AtomicInteger();
43 public ServerRequestHandler(ZMQ.Context context,
44 Broker.ProviderSession session,
47 String serverAddress) {
48 this.context = context;
49 this.dealerAddress = dealerAddress;
50 this.serverAddress = serverAddress;
51 this.broker = session;
52 this.workerCount = workerCount;
55 public ThreadPoolExecutor getWorkerPool(){
60 workerPool = new RequestHandlerThreadPool(
61 workerCount, workerCount,
62 0L, TimeUnit.MILLISECONDS,
63 new LinkedBlockingQueue<Runnable>());
64 //unbound is ok. Task will never be submitted
66 for (int i=0;i<workerCount;i++){
67 workerPool.execute(new Worker(threadId.incrementAndGet()));
72 * This gets called automatically if used with try-with-resources
76 public void close() throws Exception {
77 if (workerPool != null)
78 workerPool.shutdown();
79 _logger.info("Request Handler closed");
83 * Worker to handles RPC request
85 private class Worker implements Runnable {
88 public Worker(int id){
89 this.name = DEFAULT_NAME + "-" + id;
94 Thread.currentThread().setName(name);
95 _logger.debug("Starting... ");
96 ZMQ.Socket socket = null;
99 socket = context.socket(ZMQ.REP);
100 socket.connect(dealerAddress);
102 while (!Thread.currentThread().isInterrupted()) {
104 Message request = parseMessage(socket);
105 _logger.debug("Received rpc request [{}]", request);
107 if (request != null) {
108 // Call broker to process the message then reply
109 Future<RpcResult<CompositeNode>> rpc = null;
110 RpcResult<CompositeNode> result = null;
112 //TODO Call this in a new thread with timeout
115 (QName) request.getRoute().getType(),
116 XmlUtils.xmlToCompositeNode((String) request.getPayload()));
118 result = (rpc != null) ? rpc.get() : null;
120 } catch (Exception e) {
121 _logger.debug("Broker threw [{}]", e);
124 CompositeNode payload = (result != null) ? result.getResult() : null;
126 Message response = new Message.MessageBuilder()
127 .type(Message.MessageType.RESPONSE)
128 .sender(serverAddress)
129 .route(request.getRoute())
130 .payload(XmlUtils.compositeNodeToXml(payload))
133 _logger.debug("Sending rpc response [{}]", response);
136 socket.send(Message.serialize(response));
137 } catch (Exception e) {
138 _logger.debug("rpc response send failed for message [{}]", response);
139 _logger.debug("{}", e);
143 } catch (Exception e) {
154 private Message parseMessage(ZMQ.Socket socket) throws Exception {
155 byte[] bytes = socket.recv(); //this blocks
156 _logger.debug("Received bytes:[{}]", bytes.length);
157 return (Message) Message.deserialize(bytes);
160 private void printException(Exception e) {
161 try (StringWriter s = new StringWriter();
162 PrintWriter p = new PrintWriter(s)) {
163 e.printStackTrace(p);
164 _logger.debug(s.toString());
165 } catch (IOException e1) {/*Ignore and continue*/ }
168 private void closeSocket(ZMQ.Socket socket) {
170 if (socket != null) socket.close();
171 } catch (Exception x) {
172 _logger.debug("Exception while closing socket [{}]", x);
174 if (socket != null) socket.close();
176 _logger.debug("Closing...");
184 public class RequestHandlerThreadPool extends ThreadPoolExecutor{
186 public RequestHandlerThreadPool(int corePoolSize,
190 BlockingQueue<Runnable> workQueue) {
191 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
195 protected void afterExecute(Runnable r, Throwable t) {
196 if (isTerminating() || isTerminated() || isShutdown())
200 _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage());
203 this.execute(new Worker(threadId.incrementAndGet()));
204 super.afterExecute(r, null);