Enhancements to remote rpc server. Using zmq router-dealer bridge to make the server...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ClientRequestHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.sal.connector.remoterpc;
10
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.zeromq.ZMQ;
16
17 import java.io.IOException;
18 import java.util.Map;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.SynchronousQueue;
22 import java.util.concurrent.ThreadPoolExecutor;
23 import java.util.concurrent.TimeUnit;
24
25 /**
26  *
27  */
28 class ClientRequestHandler implements AutoCloseable{
29
30   private Logger _logger = LoggerFactory.getLogger(ClientRequestHandler.class);
31   private final String DEFAULT_NAME = "remoterpc-client-worker";
32   private final String INPROC_PROTOCOL_PREFIX = "inproc://";
33   private final String TCP_PROTOCOL_PREFIX = "tcp://";
34
35   private ZMQ.Context context;
36
37   /*
38    * Worker thread pool. Each thread runs a ROUTER-DEALER pair
39    */
40   private ExecutorService workerPool;
41
42   /*
43    * Set of remote servers this client is currently connected to
44    */
45   private Map<String, String> connectedServers;
46
47   protected ClientRequestHandler(ZMQ.Context context) {
48     this.context = context;
49     connectedServers = new ConcurrentHashMap<String, String>();
50     start();
51   }
52
53   /**
54    * Starts a pool of worker as needed. A worker thread that has not been used for 5 min
55    * is terminated and removed from the pool. If thread dies due to an exception, its
56    * restarted.
57    */
58   private void start(){
59
60     workerPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
61         5L, TimeUnit.MINUTES,
62         new SynchronousQueue<Runnable>()){
63
64       @Override
65       protected void afterExecute(Runnable r, Throwable t) {
66         if (isTerminating() || isTerminated() || isShutdown())
67           return;
68
69         Worker worker = (Worker) r;
70         Preconditions.checkState( worker != null );
71         String remoteServerAddress = worker.getRemoteServerAddress();
72         connectedServers.remove(remoteServerAddress);
73
74         if ( t != null ){
75           _logger.debug("Exception caught while terminating worker [{},{}]. " +
76               "Restarting worker...", t.getClass(), t.getMessage());
77
78           connectedServers.put(remoteServerAddress, remoteServerAddress);
79           this.execute(r);
80         }
81         super.afterExecute(r, null);
82       }
83     };
84   }
85
86   public Message handle(Message request) throws IOException, ClassNotFoundException, InterruptedException {
87
88     String remoteServerAddress = request.getRecipient();
89     //if we already have router-dealer bridge setup for this address the send request
90     //otherwise first create the bridge and then send request
91     if ( connectedServers.containsKey(remoteServerAddress) )
92       return sendMessage(request, remoteServerAddress);
93     else{
94       workerPool.execute(new Worker(remoteServerAddress));
95       connectedServers.put(remoteServerAddress, remoteServerAddress);
96       //give little time for sockets to get initialized
97       //TODO: Add socket ping-pong message to ensure socket init rather than thread.sleep.
98       Thread.sleep(1000);
99       return sendMessage(request, remoteServerAddress);
100     }
101   }
102
103   private Message sendMessage(Message request, String address) throws IOException, ClassNotFoundException {
104     Message response = null;
105     ZMQ.Socket socket = context.socket(ZMQ.REQ);
106
107     try {
108       socket.connect( INPROC_PROTOCOL_PREFIX + address);
109       socket.send(Message.serialize(request));
110       _logger.debug("Request sent. Waiting for reply...");
111       byte[] reply = socket.recv(0);
112       _logger.debug("Response received");
113       response = (Message) Message.deserialize(reply);
114     } finally {
115       socket.close();
116     }
117     return response;
118   }
119
120   /**
121    * This gets called automatically if used with try-with-resources
122    */
123   @Override
124   public void close(){
125     workerPool.shutdown();
126     _logger.info("Request Handler closed");
127   }
128
129   /**
130    * Total number of workers in the pool. Number of workers represent
131    * number of remote servers {@link org.opendaylight.controller.sal.connector.remoterpc.ClientImpl} is connected to.
132    *
133    * @return worker count
134    */
135   public int getWorkerCount(){
136
137     if (workerPool == null) return 0;
138
139     return ((ThreadPoolExecutor)workerPool).getActiveCount();
140   }
141   /**
142    * Handles RPC request
143    */
144   private class Worker implements Runnable {
145     private String name;
146     private String remoteServer;  //<servername:rpc-port>
147
148     public Worker(String address){
149       this.name = DEFAULT_NAME + "[" + address + "]";
150       this.remoteServer = address;
151     }
152
153     public String getRemoteServerAddress(){
154       return this.remoteServer;
155     }
156
157     @Override
158     public void run() {
159       Thread.currentThread().setName(name);
160       _logger.debug("Starting ... ");
161
162       ZMQ.Socket router = context.socket(ZMQ.ROUTER);
163       ZMQ.Socket dealer = context.socket(ZMQ.DEALER);
164
165       try {
166         int success = router.bind(INPROC_PROTOCOL_PREFIX + remoteServer);
167         Preconditions.checkState(-1 != success, "Could not bind to " + remoteServer);
168
169         dealer.connect(TCP_PROTOCOL_PREFIX + remoteServer);
170
171         _logger.info("Worker started for [{}]", remoteServer);
172
173         //TODO: Add capture handler
174         //This code will block until the zmq context is terminated.
175         ZMQ.proxy(router, dealer, null);
176
177       } catch (Exception e) {
178         _logger.debug("Ignoring exception [{}, {}]", e.getClass(), e.getMessage());
179       } finally {
180         try {
181           router.close();
182           dealer.close();
183         } catch (Exception x) {
184           _logger.debug("Exception while closing socket [{}]", x);
185         }
186         _logger.debug("Closing...");
187       }
188     }
189   }
190 }