2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.sal.connector.remoterpc;
11 import org.opendaylight.controller.sal.connector.api.RpcRouter;
12 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
13 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
14 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
15 import org.opendaylight.controller.sal.core.api.Broker;
16 import org.opendaylight.yangtools.yang.common.QName;
17 import org.opendaylight.yangtools.yang.common.RpcError;
18 import org.opendaylight.yangtools.yang.common.RpcResult;
19 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 import org.zeromq.ZMQ;
24 import java.io.IOException;
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 import java.util.Collection;
28 import java.util.concurrent.*;
29 import java.util.concurrent.atomic.AtomicInteger;
34 public class ServerRequestHandler implements AutoCloseable{
36 private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
37 private final String DEFAULT_NAME = "remote-rpc-worker";
38 private String dealerAddress;
39 private String serverAddress;
40 private int workerCount;
41 private ZMQ.Context context;
42 private Broker.ProviderSession broker;
44 private RequestHandlerThreadPool workerPool;
45 private final AtomicInteger threadId = new AtomicInteger();
47 public ServerRequestHandler(ZMQ.Context context,
48 Broker.ProviderSession session,
51 String serverAddress) {
52 this.context = context;
53 this.dealerAddress = dealerAddress;
54 this.serverAddress = serverAddress;
55 this.broker = session;
56 this.workerCount = workerCount;
59 public ThreadPoolExecutor getWorkerPool(){
64 workerPool = new RequestHandlerThreadPool(
65 workerCount, workerCount,
66 0L, TimeUnit.MILLISECONDS,
67 new LinkedBlockingQueue<Runnable>());
68 //unbound is ok. Task will never be submitted
70 for (int i=0;i<workerCount;i++){
71 workerPool.execute(new Worker(threadId.incrementAndGet()));
76 * This gets called automatically if used with try-with-resources
80 public void close() throws Exception {
81 if (workerPool != null)
82 workerPool.shutdown();
83 _logger.info("Request Handler closed");
87 * Worker to handles RPC request
89 private class Worker implements Runnable {
92 public Worker(int id){
93 this.name = DEFAULT_NAME + "-" + id;
98 Thread.currentThread().setName(name);
99 _logger.debug("Starting... ");
100 ZMQ.Socket socket = null;
103 socket = context.socket(ZMQ.REP);
104 socket.connect(dealerAddress);
106 while (!Thread.currentThread().isInterrupted()) {
108 MessageHandler handler = new MessageHandler(socket);
109 handler.receiveMessage();
111 if (handler.hasMessageForBroker()) {
113 Message request = handler.getMessage();
114 Future<RpcResult<CompositeNode>> rpc = null;
115 RpcResult<CompositeNode> result = null;
117 //TODO Call this in a new thread with timeout
120 (QName) request.getRoute().getType(),
121 XmlUtils.xmlToCompositeNode((String) request.getPayload()));
123 result = (rpc != null) ? rpc.get() : null;
125 handler.sendResponse(result);
127 } catch (Exception e) {
128 _logger.debug("Broker threw [{}]", e);
129 handler.sendError(e.getMessage());
134 } catch (Exception e) {
141 private void printException(Exception e) {
142 try (StringWriter s = new StringWriter();
143 PrintWriter p = new PrintWriter(s)) {
144 e.printStackTrace(p);
145 _logger.debug(s.toString());
146 } catch (IOException e1) {/*Ignore and continue*/ }
149 private void closeSocket(ZMQ.Socket socket) {
151 if (socket != null) socket.close();
152 } catch (Exception x) {
153 _logger.debug("Exception while closing socket [{}]", x);
155 if (socket != null) socket.close();
157 _logger.debug("Closing...");
165 public class RequestHandlerThreadPool extends ThreadPoolExecutor{
167 public RequestHandlerThreadPool(int corePoolSize,
171 BlockingQueue<Runnable> workQueue) {
172 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
176 protected void afterExecute(Runnable r, Throwable t) {
177 if (isTerminating() || isTerminated() || isShutdown())
181 _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage());
184 this.execute(new Worker(threadId.incrementAndGet()));
185 super.afterExecute(r, null);
189 class MessageHandler{
190 private ZMQ.Socket socket;
191 private Message message; //parsed message received on zmq server port
192 private boolean messageForBroker = false; //if the message is valid and not a "ping" message
194 public MessageHandler(ZMQ.Socket socket){
195 this.socket = socket;
198 void receiveMessage(){
199 byte[] bytes = socket.recv(); //this blocks
200 _logger.debug("Received bytes:[{}]", bytes.length);
202 Object objectRecvd = null;
204 objectRecvd = Message.deserialize(bytes);
205 }catch (Exception e){
206 sendError(e.getMessage());
210 if (!(objectRecvd instanceof Message)) {
211 sendError("Invalid message received");
215 message = (Message) objectRecvd;
217 _logger.info("Received request [{}]", message);
219 if (Message.MessageType.PING == message.getType()){
224 messageForBroker = true;
227 boolean hasMessageForBroker(){
228 return messageForBroker;
231 Message getMessage(){
235 void sendResponse(RpcResult<CompositeNode> result){
236 CompositeNode payload = (result != null) ? result.getResult() : null;
238 String recipient = null;
239 RpcRouter.RouteIdentifier routeId = null;
241 if (message != null) {
242 recipient = message.getSender();
243 routeId = message.getRoute();
246 Message response = new Message.MessageBuilder()
247 .type(Message.MessageType.RESPONSE)
248 .sender(serverAddress)
249 .recipient(recipient)
251 .payload(XmlUtils.compositeNodeToXml(payload))
257 private void sendError(String msg){
258 Message errorResponse = new Message.MessageBuilder()
259 .type(Message.MessageType.ERROR)
260 .sender(serverAddress)
267 private void sendPong(){
268 Message pong = new Message.MessageBuilder()
269 .type(Message.MessageType.PONG)
270 .sender(serverAddress)
276 private void send(Message msg){
277 byte[] serializedMessage = null;
279 serializedMessage = Message.serialize(msg);
280 } catch (Exception e) {
281 _logger.debug("Unexpected error during serialization of response [{}]", msg);
285 if (serializedMessage != null)
286 if (socket.send(serializedMessage))
287 _logger.info("Response sent [{}]", msg);
288 else _logger.debug("Failed to send serialized message");