2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.sal.connector.remoterpc;
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.zeromq.ZMQ;
17 import java.io.IOException;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.SynchronousQueue;
22 import java.util.concurrent.ThreadPoolExecutor;
23 import java.util.concurrent.TimeUnit;
28 class ClientRequestHandler implements AutoCloseable{
30 private Logger _logger = LoggerFactory.getLogger(ClientRequestHandler.class);
31 private final String DEFAULT_NAME = "remoterpc-client-worker";
32 private final String INPROC_PROTOCOL_PREFIX = "inproc://";
33 private final String TCP_PROTOCOL_PREFIX = "tcp://";
35 private ZMQ.Context context;
38 * Worker thread pool. Each thread runs a ROUTER-DEALER pair
40 private ExecutorService workerPool;
43 * Set of remote servers this client is currently connected to
45 private Map<String, String> connectedServers;
47 protected ClientRequestHandler(ZMQ.Context context) {
48 this.context = context;
49 connectedServers = new ConcurrentHashMap<String, String>();
54 * Starts a pool of worker as needed. A worker thread that has not been used for 5 min
55 * is terminated and removed from the pool. If thread dies due to an exception, its
60 workerPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
62 new SynchronousQueue<Runnable>()){
65 protected void afterExecute(Runnable r, Throwable t) {
66 if (isTerminating() || isTerminated() || isShutdown())
69 Worker worker = (Worker) r;
70 Preconditions.checkState( worker != null );
71 String remoteServerAddress = worker.getRemoteServerAddress();
72 connectedServers.remove(remoteServerAddress);
75 _logger.debug("Exception caught while terminating worker [{},{}]. " +
76 "Restarting worker...", t.getClass(), t.getMessage());
78 connectedServers.put(remoteServerAddress, remoteServerAddress);
81 super.afterExecute(r, null);
86 public Message handle(Message request) throws IOException, ClassNotFoundException, InterruptedException {
88 String remoteServerAddress = request.getRecipient();
89 //if we already have router-dealer bridge setup for this address the send request
90 //otherwise first create the bridge and then send request
91 if ( connectedServers.containsKey(remoteServerAddress) )
92 return sendMessage(request, remoteServerAddress);
94 workerPool.execute(new Worker(remoteServerAddress));
95 connectedServers.put(remoteServerAddress, remoteServerAddress);
96 //give little time for sockets to get initialized
97 //TODO: Add socket ping-pong message to ensure socket init rather than thread.sleep.
99 return sendMessage(request, remoteServerAddress);
103 private Message sendMessage(Message request, String address) throws IOException, ClassNotFoundException {
104 Message response = null;
105 ZMQ.Socket socket = context.socket(ZMQ.REQ);
108 socket.connect( INPROC_PROTOCOL_PREFIX + address);
109 socket.send(Message.serialize(request));
110 _logger.debug("Request sent. Waiting for reply...");
111 byte[] reply = socket.recv(0);
112 _logger.debug("Response received");
113 response = (Message) Message.deserialize(reply);
121 * This gets called automatically if used with try-with-resources
125 workerPool.shutdown();
126 _logger.info("Request Handler closed");
130 * Total number of workers in the pool. Number of workers represent
131 * number of remote servers {@link org.opendaylight.controller.sal.connector.remoterpc.ClientImpl} is connected to.
133 * @return worker count
135 public int getWorkerCount(){
137 if (workerPool == null) return 0;
139 return ((ThreadPoolExecutor)workerPool).getActiveCount();
142 * Handles RPC request
144 private class Worker implements Runnable {
146 private String remoteServer; //<servername:rpc-port>
148 public Worker(String address){
149 this.name = DEFAULT_NAME + "[" + address + "]";
150 this.remoteServer = address;
153 public String getRemoteServerAddress(){
154 return this.remoteServer;
159 Thread.currentThread().setName(name);
160 _logger.debug("Starting ... ");
162 ZMQ.Socket router = context.socket(ZMQ.ROUTER);
163 ZMQ.Socket dealer = context.socket(ZMQ.DEALER);
166 int success = router.bind(INPROC_PROTOCOL_PREFIX + remoteServer);
167 Preconditions.checkState(-1 != success, "Could not bind to " + remoteServer);
169 dealer.connect(TCP_PROTOCOL_PREFIX + remoteServer);
171 _logger.info("Worker started for [{}]", remoteServer);
173 //TODO: Add capture handler
174 //This code will block until the zmq context is terminated.
175 ZMQ.proxy(router, dealer, null);
177 } catch (Exception e) {
178 _logger.debug("Ignoring exception [{}, {}]", e.getClass(), e.getMessage());
183 } catch (Exception x) {
184 _logger.debug("Exception while closing socket [{}]", x);
186 _logger.debug("Closing...");