Re-added config.version to config-module-archetype.
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ClientRequestHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.sal.connector.remoterpc;
10
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.zeromq.ZMQ;
16
17 import java.io.IOException;
18 import java.util.Map;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.SynchronousQueue;
22 import java.util.concurrent.ThreadPoolExecutor;
23 import java.util.concurrent.TimeUnit;
24
25 /**
26  *
27  */
28 class ClientRequestHandler implements AutoCloseable{
29
30   private Logger _logger = LoggerFactory.getLogger(ClientRequestHandler.class);
31   private final String DEFAULT_NAME = "remoterpc-client-worker";
32   private final String INPROC_PROTOCOL_PREFIX = "inproc://";
33   private final String TCP_PROTOCOL_PREFIX = "tcp://";
34
35   private ZMQ.Context context;
36
37   /*
38    * Worker thread pool. Each thread runs a ROUTER-DEALER pair
39    */
40   private ExecutorService workerPool;
41
42   /*
43    * Set of remote servers this client is currently connected to
44    */
45   private Map<String, String> connectedServers;
46
47   protected ClientRequestHandler(ZMQ.Context context) {
48     this.context = context;
49     connectedServers = new ConcurrentHashMap<String, String>();
50     start();
51   }
52
53   /**
54    * Starts a pool of worker as needed. A worker thread that has not been used for 5 min
55    * is terminated and removed from the pool. If thread dies due to an exception, its
56    * restarted.
57    */
58   private void start(){
59
60     workerPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
61         5L, TimeUnit.MINUTES,
62         new SynchronousQueue<Runnable>()){
63
64       @Override
65       protected void afterExecute(Runnable r, Throwable t) {
66         if (isTerminating() || isTerminated() || isShutdown())
67           return;
68
69         Worker worker = (Worker) r;
70         Preconditions.checkState( worker != null );
71         String remoteServerAddress = worker.getRemoteServerAddress();
72         connectedServers.remove(remoteServerAddress);
73
74         if ( t != null ){
75           _logger.debug("Exception caught while terminating worker [{},{}]. " +
76               "Restarting worker...", t.getClass(), t.getMessage());
77
78           connectedServers.put(remoteServerAddress, remoteServerAddress);
79           this.execute(r);
80         }
81         super.afterExecute(r, null);
82       }
83     };
84   }
85
86   public Message handle(Message request) throws IOException, ClassNotFoundException, InterruptedException {
87
88     String remoteServerAddress = request.getRecipient();
89     //if we already have router-dealer bridge setup for this address the send request
90     //otherwise first create the bridge and then send request
91     if ( connectedServers.containsKey(remoteServerAddress) )
92       return sendMessage(request, remoteServerAddress);
93
94     else{
95       workerPool.execute(new Worker(remoteServerAddress));
96       connectedServers.put(remoteServerAddress, remoteServerAddress);
97       //give little time for sockets to get initialized
98       //TODO: Add socket ping-pong message to ensure socket init rather than thread.sleep.
99       Thread.sleep(1000);
100       return sendMessage(request, remoteServerAddress);
101     }
102   }
103
104   private Message sendMessage(Message request, String address) throws IOException, ClassNotFoundException {
105     Message response = null;
106     ZMQ.Socket socket = context.socket(ZMQ.REQ);
107
108     try {
109       String inProcessSocketAddress = INPROC_PROTOCOL_PREFIX + address;
110       socket.connect( inProcessSocketAddress );
111       _logger.debug("Sending request [{}]", request);
112       socket.send(Message.serialize(request));
113       _logger.info("Request sent. Waiting for reply...");
114       byte[] reply = socket.recv(0);
115       _logger.info("Response received");
116       response = (Message) Message.deserialize(reply);
117       _logger.debug("Response [{}]", response);
118     } finally {
119       socket.close();
120     }
121     return response;
122   }
123
124   /**
125    * This gets called automatically if used with try-with-resources
126    */
127   @Override
128   public void close(){
129     workerPool.shutdown();
130     _logger.info("Request Handler closed");
131   }
132
133   /**
134    * Total number of workers in the pool. Number of workers represent
135    * number of remote servers {@link org.opendaylight.controller.sal.connector.remoterpc.ClientImpl} is connected to.
136    *
137    * @return worker count
138    */
139   public int getWorkerCount(){
140
141     if (workerPool == null) return 0;
142
143     return ((ThreadPoolExecutor)workerPool).getActiveCount();
144   }
145   /**
146    * Handles RPC request
147    */
148   private class Worker implements Runnable {
149     private String name;
150     private String remoteServer;  //<serverip:rpc-port>
151
152     public Worker(String address){
153       this.name = DEFAULT_NAME + "[" + address + "]";
154       this.remoteServer = address;
155     }
156
157     public String getRemoteServerAddress(){
158       return this.remoteServer;
159     }
160
161     @Override
162     public void run() {
163       Thread.currentThread().setName(name);
164       _logger.debug("Starting ... ");
165
166       ZMQ.Socket router = context.socket(ZMQ.ROUTER);
167       ZMQ.Socket dealer = context.socket(ZMQ.DEALER);
168
169       try {
170         int success = router.bind(INPROC_PROTOCOL_PREFIX + remoteServer);
171         Preconditions.checkState(-1 != success, "Could not bind to " + remoteServer);
172
173         dealer.connect(TCP_PROTOCOL_PREFIX + remoteServer);
174
175         _logger.info("Worker started for [{}]", remoteServer);
176
177         //TODO: Add capture handler
178         //This code will block until the zmq context is terminated.
179         ZMQ.proxy(router, dealer, null);
180
181       } catch (Exception e) {
182         _logger.debug("Ignoring exception [{}, {}]", e.getClass(), e.getMessage());
183       } finally {
184         try {
185           router.close();
186           dealer.close();
187         } catch (Exception x) {
188           _logger.debug("Exception while closing socket [{}]", x);
189         }
190         _logger.debug("Closing...");
191       }
192     }
193   }
194 }