Merge "Log transactions being allocated"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerRequestHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.sal.connector.remoterpc;
10
11 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
12 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
13 import org.opendaylight.controller.sal.core.api.Broker;
14 import org.opendaylight.yangtools.yang.common.QName;
15 import org.opendaylight.yangtools.yang.common.RpcResult;
16 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 import org.zeromq.ZMQ;
20
21 import java.io.IOException;
22 import java.io.PrintWriter;
23 import java.io.StringWriter;
24 import java.util.concurrent.*;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 /**
28  *
29  */
30 public class ServerRequestHandler implements AutoCloseable{
31
32   private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
33   private final String DEFAULT_NAME = "remote-rpc-worker";
34   private String dealerAddress;
35   private String serverAddress;
36   private int workerCount;
37   private ZMQ.Context context;
38   private Broker.ProviderSession broker;
39
40   private RequestHandlerThreadPool workerPool;
41   private final AtomicInteger threadId = new AtomicInteger();
42
43   public ServerRequestHandler(ZMQ.Context context,
44                               Broker.ProviderSession session,
45                               int workerCount,
46                               String dealerAddress,
47                               String serverAddress) {
48     this.context       = context;
49     this.dealerAddress = dealerAddress;
50     this.serverAddress = serverAddress;
51     this.broker        = session;
52     this.workerCount   = workerCount;
53   }
54
55   public ThreadPoolExecutor getWorkerPool(){
56     return workerPool;
57   }
58
59   public void start(){
60     workerPool = new RequestHandlerThreadPool(
61         workerCount, workerCount,
62         0L, TimeUnit.MILLISECONDS,
63         new LinkedBlockingQueue<Runnable>());
64     //unbound is ok. Task will never be submitted
65
66     for (int i=0;i<workerCount;i++){
67       workerPool.execute(new Worker(threadId.incrementAndGet()));
68     }
69   }
70
71   /**
72    * This gets called automatically if used with try-with-resources
73    * @throws Exception
74    */
75   @Override
76   public void close() throws Exception {
77     if (workerPool != null)
78       workerPool.shutdown();
79     _logger.info("Request Handler closed");
80   }
81
82   /**
83    * Worker to handles RPC request
84    */
85   private class Worker implements Runnable {
86     private String name;
87
88     public Worker(int id){
89       this.name = DEFAULT_NAME + "-" + id;
90     }
91
92     @Override
93     public void run() {
94       Thread.currentThread().setName(name);
95       _logger.debug("Starting... ");
96       ZMQ.Socket socket = null;
97
98       try {
99         socket = context.socket(ZMQ.REP);
100         socket.connect(dealerAddress);
101
102         while (!Thread.currentThread().isInterrupted()) {
103
104           Message request = parseMessage(socket);
105           _logger.debug("Received rpc request [{}]", request);
106
107           if (request != null) {
108             // Call broker to process the message then reply
109             Future<RpcResult<CompositeNode>> rpc = null;
110             RpcResult<CompositeNode> result = null;
111
112             //TODO Call this in a new thread with timeout
113             try {
114               rpc = broker.rpc(
115                   (QName) request.getRoute().getType(),
116                   XmlUtils.xmlToCompositeNode((String) request.getPayload()));
117
118               result = (rpc != null) ? rpc.get() : null;
119
120             } catch (Exception e) {
121               _logger.debug("Broker threw  [{}]", e);
122             }
123
124             CompositeNode payload = (result != null) ? result.getResult() : null;
125
126             Message response = new Message.MessageBuilder()
127                 .type(Message.MessageType.RESPONSE)
128                 .sender(serverAddress)
129                 .route(request.getRoute())
130                 .payload(XmlUtils.compositeNodeToXml(payload))
131                 .build();
132
133             _logger.debug("Sending rpc response [{}]", response);
134
135             try {
136               socket.send(Message.serialize(response));
137             } catch (Exception e) {
138               _logger.debug("rpc response send failed for message [{}]", response);
139               _logger.debug("{}", e);
140             }
141           }
142         }
143       } catch (Exception e) {
144         printException(e);
145       } finally {
146         closeSocket(socket);
147       }
148     }
149
150     /**
151      * @param socket
152      * @return
153      */
154     private Message parseMessage(ZMQ.Socket socket) throws Exception {
155       byte[] bytes = socket.recv(); //this blocks
156       _logger.debug("Received bytes:[{}]", bytes.length);
157       return (Message) Message.deserialize(bytes);
158     }
159
160     private void printException(Exception e) {
161       try (StringWriter s = new StringWriter();
162            PrintWriter p = new PrintWriter(s)) {
163         e.printStackTrace(p);
164         _logger.debug(s.toString());
165       } catch (IOException e1) {/*Ignore and continue*/ }
166     }
167
168     private void closeSocket(ZMQ.Socket socket) {
169       try {
170         if (socket != null) socket.close();
171       } catch (Exception x) {
172         _logger.debug("Exception while closing socket [{}]", x);
173       } finally {
174         if (socket != null) socket.close();
175       }
176       _logger.debug("Closing...");
177     }
178   }
179
180
181   /**
182    *
183    */
184   public class RequestHandlerThreadPool extends ThreadPoolExecutor{
185
186     public RequestHandlerThreadPool(int corePoolSize,
187                                     int maximumPoolSize,
188                                     long keepAliveTime,
189                                     TimeUnit unit,
190                                     BlockingQueue<Runnable> workQueue) {
191       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
192     }
193
194     @Override
195     protected void afterExecute(Runnable r, Throwable t) {
196       if (isTerminating() || isTerminated() || isShutdown())
197         return;
198
199       if ( t != null ){
200         _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage());
201       }
202
203       this.execute(new Worker(threadId.incrementAndGet()));
204       super.afterExecute(r, null);
205     }
206   }
207 }