2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.sal.connector.remoterpc;
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.zeromq.ZMQ;
17 import java.io.IOException;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.SynchronousQueue;
22 import java.util.concurrent.ThreadPoolExecutor;
23 import java.util.concurrent.TimeUnit;
28 class ClientRequestHandler implements AutoCloseable{
30 private Logger _logger = LoggerFactory.getLogger(ClientRequestHandler.class);
31 private final String DEFAULT_NAME = "remoterpc-client-worker";
32 private final String INPROC_PROTOCOL_PREFIX = "inproc://";
33 private final String TCP_PROTOCOL_PREFIX = "tcp://";
35 private ZMQ.Context context;
38 * Worker thread pool. Each thread runs a ROUTER-DEALER pair
40 private ExecutorService workerPool;
43 * Set of remote servers this client is currently connected to
45 private Map<String, String> connectedServers;
47 protected ClientRequestHandler(ZMQ.Context context) {
48 this.context = context;
49 connectedServers = new ConcurrentHashMap<String, String>();
54 * Starts a pool of worker as needed. A worker thread that has not been used for 5 min
55 * is terminated and removed from the pool. If thread dies due to an exception, its
60 workerPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
62 new SynchronousQueue<Runnable>()){
65 protected void afterExecute(Runnable r, Throwable t) {
66 if (isTerminating() || isTerminated() || isShutdown())
69 Worker worker = (Worker) r;
70 Preconditions.checkState( worker != null );
71 String remoteServerAddress = worker.getRemoteServerAddress();
72 connectedServers.remove(remoteServerAddress);
75 _logger.debug("Exception caught while terminating worker [{},{}]. " +
76 "Restarting worker...", t.getClass(), t.getMessage());
78 connectedServers.put(remoteServerAddress, remoteServerAddress);
81 super.afterExecute(r, null);
86 public Message handle(Message request) throws IOException, ClassNotFoundException, InterruptedException {
88 String remoteServerAddress = request.getRecipient();
89 //if we already have router-dealer bridge setup for this address the send request
90 //otherwise first create the bridge and then send request
91 if ( connectedServers.containsKey(remoteServerAddress) )
92 return sendMessage(request, remoteServerAddress);
95 workerPool.execute(new Worker(remoteServerAddress));
96 connectedServers.put(remoteServerAddress, remoteServerAddress);
97 //give little time for sockets to get initialized
98 //TODO: Add socket ping-pong message to ensure socket init rather than thread.sleep.
100 return sendMessage(request, remoteServerAddress);
104 private Message sendMessage(Message request, String address) throws IOException, ClassNotFoundException {
105 Message response = null;
106 ZMQ.Socket socket = context.socket(ZMQ.REQ);
109 String inProcessSocketAddress = INPROC_PROTOCOL_PREFIX + address;
110 socket.connect( inProcessSocketAddress );
111 _logger.debug("Sending request [{}]", request);
112 socket.send(Message.serialize(request));
113 _logger.info("Request sent. Waiting for reply...");
114 byte[] reply = socket.recv(0);
115 _logger.info("Response received");
116 response = (Message) Message.deserialize(reply);
117 _logger.debug("Response [{}]", response);
125 * This gets called automatically if used with try-with-resources
129 workerPool.shutdown();
130 _logger.info("Request Handler closed");
134 * Total number of workers in the pool. Number of workers represent
135 * number of remote servers {@link org.opendaylight.controller.sal.connector.remoterpc.ClientImpl} is connected to.
137 * @return worker count
139 public int getWorkerCount(){
141 if (workerPool == null) return 0;
143 return ((ThreadPoolExecutor)workerPool).getActiveCount();
146 * Handles RPC request
148 private class Worker implements Runnable {
150 private String remoteServer; //<serverip:rpc-port>
152 public Worker(String address){
153 this.name = DEFAULT_NAME + "[" + address + "]";
154 this.remoteServer = address;
157 public String getRemoteServerAddress(){
158 return this.remoteServer;
163 Thread.currentThread().setName(name);
164 _logger.debug("Starting ... ");
166 ZMQ.Socket router = context.socket(ZMQ.ROUTER);
167 ZMQ.Socket dealer = context.socket(ZMQ.DEALER);
170 int success = router.bind(INPROC_PROTOCOL_PREFIX + remoteServer);
171 Preconditions.checkState(-1 != success, "Could not bind to " + remoteServer);
173 dealer.connect(TCP_PROTOCOL_PREFIX + remoteServer);
175 _logger.info("Worker started for [{}]", remoteServer);
177 //TODO: Add capture handler
178 //This code will block until the zmq context is terminated.
179 ZMQ.proxy(router, dealer, null);
181 } catch (Exception e) {
182 _logger.debug("Ignoring exception [{}, {}]", e.getClass(), e.getMessage());
187 } catch (Exception x) {
188 _logger.debug("Exception while closing socket [{}]", x);
190 _logger.debug("Closing...");