Cosmetics: check in pom.xml files as _sort_pom_ wants them to be
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerRequestHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.sal.connector.remoterpc;
10
11 import org.opendaylight.controller.sal.connector.api.RpcRouter;
12 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
13 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
14 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
15 import org.opendaylight.controller.sal.core.api.Broker;
16 import org.opendaylight.yangtools.yang.common.QName;
17 import org.opendaylight.yangtools.yang.common.RpcError;
18 import org.opendaylight.yangtools.yang.common.RpcResult;
19 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 import org.zeromq.ZMQ;
23
24 import java.io.IOException;
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 import java.util.Collection;
28 import java.util.concurrent.*;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 /**
32  *
33  */
34 public class ServerRequestHandler implements AutoCloseable{
35
36   private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
37   private final String DEFAULT_NAME = "remote-rpc-worker";
38   private String dealerAddress;
39   private String serverAddress;
40   private int workerCount;
41   private ZMQ.Context context;
42   private Broker.ProviderSession broker;
43
44   private RequestHandlerThreadPool workerPool;
45   private final AtomicInteger threadId = new AtomicInteger();
46
47   public ServerRequestHandler(ZMQ.Context context,
48                               Broker.ProviderSession session,
49                               int workerCount,
50                               String dealerAddress,
51                               String serverAddress) {
52     this.context       = context;
53     this.dealerAddress = dealerAddress;
54     this.serverAddress = serverAddress;
55     this.broker        = session;
56     this.workerCount   = workerCount;
57   }
58
59   public ThreadPoolExecutor getWorkerPool(){
60     return workerPool;
61   }
62
63   public void start(){
64     workerPool = new RequestHandlerThreadPool(
65         workerCount, workerCount,
66         0L, TimeUnit.MILLISECONDS,
67         new LinkedBlockingQueue<Runnable>());
68     //unbound is ok. Task will never be submitted
69
70     for (int i=0;i<workerCount;i++){
71       workerPool.execute(new Worker(threadId.incrementAndGet()));
72     }
73   }
74
75   /**
76    * This gets called automatically if used with try-with-resources
77    * @throws Exception
78    */
79   @Override
80   public void close() throws Exception {
81     if (workerPool != null)
82       workerPool.shutdown();
83     _logger.info("Request Handler closed");
84   }
85
86   /**
87    * Worker to handles RPC request
88    */
89   private class Worker implements Runnable {
90     private String name;
91
92     public Worker(int id){
93       this.name = DEFAULT_NAME + "-" + id;
94     }
95
96     @Override
97     public void run() {
98       Thread.currentThread().setName(name);
99       _logger.debug("Starting... ");
100       ZMQ.Socket socket = null;
101
102       try {
103         socket = context.socket(ZMQ.REP);
104         socket.connect(dealerAddress);
105
106         while (!Thread.currentThread().isInterrupted()) {
107
108           MessageHandler handler = new MessageHandler(socket);
109           handler.receiveMessage();
110
111           if (handler.hasMessageForBroker()) {
112
113             Message request = handler.getMessage();
114             Future<RpcResult<CompositeNode>> rpc = null;
115             RpcResult<CompositeNode> result = null;
116
117             //TODO Call this in a new thread with timeout
118             try {
119               rpc = broker.rpc(
120                   (QName) request.getRoute().getType(),
121                   XmlUtils.xmlToCompositeNode((String) request.getPayload()));
122
123               result = (rpc != null) ? rpc.get() : null;
124
125               handler.sendResponse(result);
126
127             } catch (Exception e) {
128               _logger.debug("Broker threw  [{}]", e);
129               handler.sendError(e.getMessage());
130             }
131           }
132
133         }
134       } catch (Exception e) {
135         printException(e);
136       } finally {
137         closeSocket(socket);
138       }
139     }
140
141     private void printException(Exception e) {
142       try (StringWriter s = new StringWriter();
143            PrintWriter p = new PrintWriter(s)) {
144         e.printStackTrace(p);
145         _logger.debug(s.toString());
146       } catch (IOException e1) {/*Ignore and continue*/ }
147     }
148
149     private void closeSocket(ZMQ.Socket socket) {
150       try {
151         if (socket != null) socket.close();
152       } catch (Exception x) {
153         _logger.debug("Exception while closing socket [{}]", x);
154       } finally {
155         if (socket != null) socket.close();
156       }
157       _logger.debug("Closing...");
158     }
159   }
160
161
162   /**
163    *
164    */
165   public class RequestHandlerThreadPool extends ThreadPoolExecutor{
166
167     public RequestHandlerThreadPool(int corePoolSize,
168                                     int maximumPoolSize,
169                                     long keepAliveTime,
170                                     TimeUnit unit,
171                                     BlockingQueue<Runnable> workQueue) {
172       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
173     }
174
175     @Override
176     protected void afterExecute(Runnable r, Throwable t) {
177       if (isTerminating() || isTerminated() || isShutdown())
178         return;
179
180       if ( t != null ){
181         _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage());
182       }
183
184       this.execute(new Worker(threadId.incrementAndGet()));
185       super.afterExecute(r, null);
186     }
187   }
188
189   class MessageHandler{
190     private ZMQ.Socket socket;
191     private Message message;          //parsed message received on zmq server port
192     private boolean messageForBroker = false; //if the message is valid and not a "ping" message
193
194     public MessageHandler(ZMQ.Socket socket){
195       this.socket = socket;
196     }
197
198     void receiveMessage(){
199       byte[] bytes = socket.recv(); //this blocks
200       _logger.debug("Received bytes:[{}]", bytes.length);
201
202       Object objectRecvd = null;
203       try{
204         objectRecvd = Message.deserialize(bytes);
205       }catch (Exception e){
206         sendError(e.getMessage());
207         return;
208       }
209
210       if (!(objectRecvd instanceof Message)) {
211         sendError("Invalid message received");
212         return;
213       }
214
215       message = (Message) objectRecvd;
216
217       _logger.info("Received request [{}]", message);
218
219       if (Message.MessageType.PING == message.getType()){
220         sendPong();
221         return;
222       }
223
224       messageForBroker = true;
225     }
226
227     boolean hasMessageForBroker(){
228       return messageForBroker;
229     }
230
231     Message getMessage(){
232       return message;
233     }
234
235     void sendResponse(RpcResult<CompositeNode> result){
236       CompositeNode payload = (result != null) ? result.getResult() : null;
237
238       String recipient = null;
239       RpcRouter.RouteIdentifier routeId = null;
240
241       if (message != null) {
242         recipient = message.getSender();
243         routeId   = message.getRoute();
244       }
245
246       Message response = new Message.MessageBuilder()
247           .type(Message.MessageType.RESPONSE)
248           .sender(serverAddress)
249           .recipient(recipient)
250           .route(routeId)
251           .payload(XmlUtils.compositeNodeToXml(payload))
252           .build();
253
254       send(response);
255     }
256
257     private void sendError(String msg){
258       Message errorResponse = new Message.MessageBuilder()
259           .type(Message.MessageType.ERROR)
260           .sender(serverAddress)
261           .payload(msg)
262           .build();
263
264       send(errorResponse);
265     }
266
267     private void sendPong(){
268       Message pong = new Message.MessageBuilder()
269           .type(Message.MessageType.PONG)
270           .sender(serverAddress)
271           .build();
272
273       send(pong);
274     }
275
276     private void send(Message msg){
277       byte[] serializedMessage = null;
278       try {
279         serializedMessage = Message.serialize(msg);
280       } catch (Exception e) {
281         _logger.debug("Unexpected error during serialization of response [{}]", msg);
282         return;
283       }
284
285       if (serializedMessage != null)
286         if (socket.send(serializedMessage))
287           _logger.info("Response sent [{}]", msg);
288         else  _logger.debug("Failed to send serialized message");
289     }
290   }
291 }