/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.connector.remoterpc; import com.google.common.base.Preconditions; import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * */ class ClientRequestHandler implements AutoCloseable{ private Logger _logger = LoggerFactory.getLogger(ClientRequestHandler.class); private final String DEFAULT_NAME = "remoterpc-client-worker"; private final String INPROC_PROTOCOL_PREFIX = "inproc://"; private final String TCP_PROTOCOL_PREFIX = "tcp://"; private ZMQ.Context context; /* * Worker thread pool. Each thread runs a ROUTER-DEALER pair */ private ExecutorService workerPool; /* * Set of remote servers this client is currently connected to */ private Map connectedServers; protected ClientRequestHandler(ZMQ.Context context) { this.context = context; connectedServers = new ConcurrentHashMap(); start(); } /** * Starts a pool of worker as needed. A worker thread that has not been used for 5 min * is terminated and removed from the pool. If thread dies due to an exception, its * restarted. */ private void start(){ workerPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5L, TimeUnit.MINUTES, new SynchronousQueue()){ @Override protected void afterExecute(Runnable r, Throwable t) { if (isTerminating() || isTerminated() || isShutdown()) return; Worker worker = (Worker) r; Preconditions.checkState( worker != null ); String remoteServerAddress = worker.getRemoteServerAddress(); connectedServers.remove(remoteServerAddress); if ( t != null ){ _logger.debug("Exception caught while terminating worker [{},{}]. " + "Restarting worker...", t.getClass(), t.getMessage()); connectedServers.put(remoteServerAddress, remoteServerAddress); this.execute(r); } super.afterExecute(r, null); } }; } public Message handle(Message request) throws IOException, ClassNotFoundException, InterruptedException { String remoteServerAddress = request.getRecipient(); //if we already have router-dealer bridge setup for this address the send request //otherwise first create the bridge and then send request if ( connectedServers.containsKey(remoteServerAddress) ) return sendMessage(request, remoteServerAddress); else{ workerPool.execute(new Worker(remoteServerAddress)); connectedServers.put(remoteServerAddress, remoteServerAddress); //give little time for sockets to get initialized //TODO: Add socket ping-pong message to ensure socket init rather than thread.sleep. Thread.sleep(1000); return sendMessage(request, remoteServerAddress); } } private Message sendMessage(Message request, String address) throws IOException, ClassNotFoundException { Message response = null; ZMQ.Socket socket = context.socket(ZMQ.REQ); try { socket.connect( INPROC_PROTOCOL_PREFIX + address); socket.send(Message.serialize(request)); _logger.debug("Request sent. Waiting for reply..."); byte[] reply = socket.recv(0); _logger.debug("Response received"); response = (Message) Message.deserialize(reply); } finally { socket.close(); } return response; } /** * This gets called automatically if used with try-with-resources */ @Override public void close(){ workerPool.shutdown(); _logger.info("Request Handler closed"); } /** * Total number of workers in the pool. Number of workers represent * number of remote servers {@link org.opendaylight.controller.sal.connector.remoterpc.ClientImpl} is connected to. * * @return worker count */ public int getWorkerCount(){ if (workerPool == null) return 0; return ((ThreadPoolExecutor)workerPool).getActiveCount(); } /** * Handles RPC request */ private class Worker implements Runnable { private String name; private String remoteServer; // public Worker(String address){ this.name = DEFAULT_NAME + "[" + address + "]"; this.remoteServer = address; } public String getRemoteServerAddress(){ return this.remoteServer; } @Override public void run() { Thread.currentThread().setName(name); _logger.debug("Starting ... "); ZMQ.Socket router = context.socket(ZMQ.ROUTER); ZMQ.Socket dealer = context.socket(ZMQ.DEALER); try { int success = router.bind(INPROC_PROTOCOL_PREFIX + remoteServer); Preconditions.checkState(-1 != success, "Could not bind to " + remoteServer); dealer.connect(TCP_PROTOCOL_PREFIX + remoteServer); _logger.info("Worker started for [{}]", remoteServer); //TODO: Add capture handler //This code will block until the zmq context is terminated. ZMQ.proxy(router, dealer, null); } catch (Exception e) { _logger.debug("Ignoring exception [{}, {}]", e.getClass(), e.getMessage()); } finally { try { router.close(); dealer.close(); } catch (Exception x) { _logger.debug("Exception while closing socket [{}]", x); } _logger.debug("Closing..."); } } } }