BUG-731: fix warnings from missing generic arguments
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerRequestHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.sal.connector.remoterpc;
10
11 import java.io.IOException;
12 import java.io.PrintWriter;
13 import java.io.StringWriter;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.ThreadPoolExecutor;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import org.opendaylight.controller.sal.connector.api.RpcRouter;
22 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
23 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
24 import org.opendaylight.controller.sal.core.api.Broker;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.zeromq.ZMQ;
31
32 /**
33  *
34  */
35 public class ServerRequestHandler implements AutoCloseable{
36
37   private final Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
38   private final String DEFAULT_NAME = "remote-rpc-worker";
39   private final String dealerAddress;
40   private final String serverAddress;
41   private final int workerCount;
42   private final ZMQ.Context context;
43   private final Broker.ProviderSession broker;
44
45   private RequestHandlerThreadPool workerPool;
46   private final AtomicInteger threadId = new AtomicInteger();
47
48   public ServerRequestHandler(ZMQ.Context context,
49                               Broker.ProviderSession session,
50                               int workerCount,
51                               String dealerAddress,
52                               String serverAddress) {
53     this.context       = context;
54     this.dealerAddress = dealerAddress;
55     this.serverAddress = serverAddress;
56     this.broker        = session;
57     this.workerCount   = workerCount;
58   }
59
60   public ThreadPoolExecutor getWorkerPool(){
61     return workerPool;
62   }
63
64   public void start(){
65     workerPool = new RequestHandlerThreadPool(
66         workerCount, workerCount,
67         0L, TimeUnit.MILLISECONDS,
68         new LinkedBlockingQueue<Runnable>());
69     //unbound is ok. Task will never be submitted
70
71     for (int i=0;i<workerCount;i++){
72       workerPool.execute(new Worker(threadId.incrementAndGet()));
73     }
74   }
75
76   /**
77    * This gets called automatically if used with try-with-resources
78    * @throws Exception
79    */
80   @Override
81   public void close() throws Exception {
82     if (workerPool != null)
83       workerPool.shutdown();
84     _logger.info("Request Handler closed");
85   }
86
87   /**
88    * Worker to handles RPC request
89    */
90   private class Worker implements Runnable {
91     private final String name;
92
93     public Worker(int id){
94       this.name = DEFAULT_NAME + "-" + id;
95     }
96
97     @Override
98     public void run() {
99       Thread.currentThread().setName(name);
100       _logger.debug("Starting... ");
101       ZMQ.Socket socket = null;
102
103       try {
104         socket = context.socket(ZMQ.REP);
105         socket.connect(dealerAddress);
106
107         while (!Thread.currentThread().isInterrupted()) {
108
109           MessageHandler handler = new MessageHandler(socket);
110           handler.receiveMessage();
111
112           if (handler.hasMessageForBroker()) {
113
114             Message request = handler.getMessage();
115             Future<RpcResult<CompositeNode>> rpc = null;
116             RpcResult<CompositeNode> result = null;
117
118             //TODO Call this in a new thread with timeout
119             try {
120               rpc = broker.rpc(
121                   (QName) request.getRoute().getType(),
122                   XmlUtils.xmlToCompositeNode((String) request.getPayload()));
123
124               result = (rpc != null) ? rpc.get() : null;
125
126               handler.sendResponse(result);
127
128             } catch (Exception e) {
129               _logger.debug("Broker threw  [{}]", e);
130               handler.sendError(e.getMessage());
131             }
132           }
133
134         }
135       } catch (Exception e) {
136         printException(e);
137       } finally {
138         closeSocket(socket);
139       }
140     }
141
142     private void printException(Exception e) {
143       try (StringWriter s = new StringWriter();
144            PrintWriter p = new PrintWriter(s)) {
145         e.printStackTrace(p);
146         _logger.debug(s.toString());
147       } catch (IOException e1) {/*Ignore and continue*/ }
148     }
149
150     private void closeSocket(ZMQ.Socket socket) {
151       try {
152         if (socket != null) socket.close();
153       } catch (Exception x) {
154         _logger.debug("Exception while closing socket [{}]", x);
155       } finally {
156         if (socket != null) socket.close();
157       }
158       _logger.debug("Closing...");
159     }
160   }
161
162
163   /**
164    *
165    */
166   public class RequestHandlerThreadPool extends ThreadPoolExecutor{
167
168     public RequestHandlerThreadPool(int corePoolSize,
169                                     int maximumPoolSize,
170                                     long keepAliveTime,
171                                     TimeUnit unit,
172                                     BlockingQueue<Runnable> workQueue) {
173       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
174     }
175
176     @Override
177     protected void afterExecute(Runnable r, Throwable t) {
178       if (isTerminating() || isTerminated() || isShutdown())
179         return;
180
181       if ( t != null ){
182         _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage());
183       }
184
185       this.execute(new Worker(threadId.incrementAndGet()));
186       super.afterExecute(r, null);
187     }
188   }
189
190   class MessageHandler{
191     private final ZMQ.Socket socket;
192     private Message message;          //parsed message received on zmq server port
193     private boolean messageForBroker = false; //if the message is valid and not a "ping" message
194
195     public MessageHandler(ZMQ.Socket socket){
196       this.socket = socket;
197     }
198
199     void receiveMessage(){
200       byte[] bytes = socket.recv(); //this blocks
201       _logger.debug("Received bytes:[{}]", bytes.length);
202
203       Object objectRecvd = null;
204       try{
205         objectRecvd = Message.deserialize(bytes);
206       }catch (Exception e){
207         sendError(e.getMessage());
208         return;
209       }
210
211       if (!(objectRecvd instanceof Message)) {
212         sendError("Invalid message received");
213         return;
214       }
215
216       message = (Message) objectRecvd;
217
218       _logger.info("Received request [{}]", message);
219
220       if (Message.MessageType.PING == message.getType()){
221         sendPong();
222         return;
223       }
224
225       messageForBroker = true;
226     }
227
228     boolean hasMessageForBroker(){
229       return messageForBroker;
230     }
231
232     Message getMessage(){
233       return message;
234     }
235
236     void sendResponse(RpcResult<CompositeNode> result){
237       CompositeNode payload = (result != null) ? result.getResult() : null;
238
239       String recipient = null;
240       RpcRouter.RouteIdentifier<?, ?, ?> routeId = null;
241
242       if (message != null) {
243         recipient = message.getSender();
244         routeId   = message.getRoute();
245       }
246
247       Message response = new Message.MessageBuilder()
248           .type(Message.MessageType.RESPONSE)
249           .sender(serverAddress)
250           .recipient(recipient)
251           .route(routeId)
252           .payload(XmlUtils.compositeNodeToXml(payload))
253           .build();
254
255       send(response);
256     }
257
258     private void sendError(String msg){
259       Message errorResponse = new Message.MessageBuilder()
260           .type(Message.MessageType.ERROR)
261           .sender(serverAddress)
262           .payload(msg)
263           .build();
264
265       send(errorResponse);
266     }
267
268     private void sendPong(){
269       Message pong = new Message.MessageBuilder()
270           .type(Message.MessageType.PONG)
271           .sender(serverAddress)
272           .build();
273
274       send(pong);
275     }
276
277     private void send(Message msg){
278       byte[] serializedMessage = null;
279       try {
280         serializedMessage = Message.serialize(msg);
281       } catch (Exception e) {
282         _logger.debug("Unexpected error during serialization of response [{}]", msg);
283         return;
284       }
285
286       if (serializedMessage != null)
287         if (socket.send(serializedMessage))
288           _logger.info("Response sent [{}]", msg);
289         else  _logger.debug("Failed to send serialized message");
290     }
291   }
292 }