2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.sal.connector.remoterpc;
11 import java.io.IOException;
12 import java.io.PrintWriter;
13 import java.io.StringWriter;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.ThreadPoolExecutor;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
21 import org.opendaylight.controller.sal.connector.api.RpcRouter;
22 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
23 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
24 import org.opendaylight.controller.sal.core.api.Broker;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.zeromq.ZMQ;
35 public class ServerRequestHandler implements AutoCloseable{
37 private final Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
38 private final String DEFAULT_NAME = "remote-rpc-worker";
39 private final String dealerAddress;
40 private final String serverAddress;
41 private final int workerCount;
42 private final ZMQ.Context context;
43 private final Broker.ProviderSession broker;
45 private RequestHandlerThreadPool workerPool;
46 private final AtomicInteger threadId = new AtomicInteger();
48 public ServerRequestHandler(ZMQ.Context context,
49 Broker.ProviderSession session,
52 String serverAddress) {
53 this.context = context;
54 this.dealerAddress = dealerAddress;
55 this.serverAddress = serverAddress;
56 this.broker = session;
57 this.workerCount = workerCount;
60 public ThreadPoolExecutor getWorkerPool(){
65 workerPool = new RequestHandlerThreadPool(
66 workerCount, workerCount,
67 0L, TimeUnit.MILLISECONDS,
68 new LinkedBlockingQueue<Runnable>());
69 //unbound is ok. Task will never be submitted
71 for (int i=0;i<workerCount;i++){
72 workerPool.execute(new Worker(threadId.incrementAndGet()));
77 * This gets called automatically if used with try-with-resources
81 public void close() throws Exception {
82 if (workerPool != null)
83 workerPool.shutdown();
84 _logger.info("Request Handler closed");
88 * Worker to handles RPC request
90 private class Worker implements Runnable {
91 private final String name;
93 public Worker(int id){
94 this.name = DEFAULT_NAME + "-" + id;
99 Thread.currentThread().setName(name);
100 _logger.debug("Starting... ");
101 ZMQ.Socket socket = null;
104 socket = context.socket(ZMQ.REP);
105 socket.connect(dealerAddress);
107 while (!Thread.currentThread().isInterrupted()) {
109 MessageHandler handler = new MessageHandler(socket);
110 handler.receiveMessage();
112 if (handler.hasMessageForBroker()) {
114 Message request = handler.getMessage();
115 Future<RpcResult<CompositeNode>> rpc = null;
116 RpcResult<CompositeNode> result = null;
118 //TODO Call this in a new thread with timeout
121 (QName) request.getRoute().getType(),
122 XmlUtils.xmlToCompositeNode((String) request.getPayload()));
124 result = (rpc != null) ? rpc.get() : null;
126 handler.sendResponse(result);
128 } catch (Exception e) {
129 _logger.debug("Broker threw [{}]", e);
130 handler.sendError(e.getMessage());
135 } catch (Exception e) {
142 private void printException(Exception e) {
143 try (StringWriter s = new StringWriter();
144 PrintWriter p = new PrintWriter(s)) {
145 e.printStackTrace(p);
146 _logger.debug(s.toString());
147 } catch (IOException e1) {/*Ignore and continue*/ }
150 private void closeSocket(ZMQ.Socket socket) {
152 if (socket != null) socket.close();
153 } catch (Exception x) {
154 _logger.debug("Exception while closing socket [{}]", x);
156 if (socket != null) socket.close();
158 _logger.debug("Closing...");
166 public class RequestHandlerThreadPool extends ThreadPoolExecutor{
168 public RequestHandlerThreadPool(int corePoolSize,
172 BlockingQueue<Runnable> workQueue) {
173 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
177 protected void afterExecute(Runnable r, Throwable t) {
178 if (isTerminating() || isTerminated() || isShutdown())
182 _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage());
185 this.execute(new Worker(threadId.incrementAndGet()));
186 super.afterExecute(r, null);
190 class MessageHandler{
191 private final ZMQ.Socket socket;
192 private Message message; //parsed message received on zmq server port
193 private boolean messageForBroker = false; //if the message is valid and not a "ping" message
195 public MessageHandler(ZMQ.Socket socket){
196 this.socket = socket;
199 void receiveMessage(){
200 byte[] bytes = socket.recv(); //this blocks
201 _logger.debug("Received bytes:[{}]", bytes.length);
203 Object objectRecvd = null;
205 objectRecvd = Message.deserialize(bytes);
206 }catch (Exception e){
207 sendError(e.getMessage());
211 if (!(objectRecvd instanceof Message)) {
212 sendError("Invalid message received");
216 message = (Message) objectRecvd;
218 _logger.info("Received request [{}]", message);
220 if (Message.MessageType.PING == message.getType()){
225 messageForBroker = true;
228 boolean hasMessageForBroker(){
229 return messageForBroker;
232 Message getMessage(){
236 void sendResponse(RpcResult<CompositeNode> result){
237 CompositeNode payload = (result != null) ? result.getResult() : null;
239 String recipient = null;
240 RpcRouter.RouteIdentifier<?, ?, ?> routeId = null;
242 if (message != null) {
243 recipient = message.getSender();
244 routeId = message.getRoute();
247 Message response = new Message.MessageBuilder()
248 .type(Message.MessageType.RESPONSE)
249 .sender(serverAddress)
250 .recipient(recipient)
252 .payload(XmlUtils.compositeNodeToXml(payload))
258 private void sendError(String msg){
259 Message errorResponse = new Message.MessageBuilder()
260 .type(Message.MessageType.ERROR)
261 .sender(serverAddress)
268 private void sendPong(){
269 Message pong = new Message.MessageBuilder()
270 .type(Message.MessageType.PONG)
271 .sender(serverAddress)
277 private void send(Message msg){
278 byte[] serializedMessage = null;
280 serializedMessage = Message.serialize(msg);
281 } catch (Exception e) {
282 _logger.debug("Unexpected error during serialization of response [{}]", msg);
286 if (serializedMessage != null)
287 if (socket.send(serializedMessage))
288 _logger.info("Response sent [{}]", msg);
289 else _logger.debug("Failed to send serialized message");