2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import java.net.Inet4Address;
11 import java.net.InetAddress;
12 import java.net.NetworkInterface;
13 import java.net.SocketException;
14 import java.util.Enumeration;
15 import java.util.HashSet;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.FutureTask;
20 import java.util.concurrent.TimeUnit;
22 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import org.zeromq.ZMQ;
28 import com.google.common.base.Optional;
29 import com.google.common.base.Preconditions;
32 * ZeroMq based implementation of RpcRouter.
34 public class ServerImpl implements RemoteRpcServer {
36 private final Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
38 private ExecutorService serverPool;
39 protected ServerRequestHandler handler;
41 private Set<QName> remoteServices;
42 private ProviderSession brokerSession;
43 private ZMQ.Context context;
45 private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
46 private final int HANDLER_WORKER_COUNT = 2;
47 private final int HWM = 200;//high water mark on sockets
48 private volatile State status = State.STOPPED;
50 private String serverAddress;
51 private final int port;
53 public static enum State {
54 STARTING, STARTED, STOPPED;
57 public ServerImpl(int port) {
61 public State getStatus() {
65 public Optional<ServerRequestHandler> getHandler() {
66 return Optional.fromNullable(this.handler);
69 public void setBrokerSession(ProviderSession session) {
70 this.brokerSession = session;
73 public Optional<ProviderSession> getBrokerSession() {
74 return Optional.fromNullable(this.brokerSession);
77 public Optional<ZMQ.Context> getZmqContext() {
78 return Optional.fromNullable(this.context);
81 public String getServerAddress() {
85 public String getHandlerAddress() {
86 return HANDLER_INPROC_ADDRESS;
93 Preconditions.checkState(State.STOPPED == this.getStatus(),
94 "Remote RPC Server is already running");
96 status = State.STARTING;
97 _logger.debug("Remote RPC Server is starting...");
99 String hostIpAddress = findIpAddress();
101 //Log and silently die as per discussion in the bug (bug-362)
102 //https://bugs.opendaylight.org/show_bug.cgi?id=362
104 // A tracking enhancement defect (bug-366) is created to properly fix this issue
105 //https://bugs.opendaylight.org/show_bug.cgi?id=366
106 //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address");
108 if (hostIpAddress == null) {
109 _logger.error("Remote RPC Server could not acquire host ip address. Stopping...");
114 this.serverAddress = new StringBuilder(hostIpAddress).
119 context = ZMQ.context(1);
120 remoteServices = new HashSet<QName>();//
121 serverPool = Executors.newSingleThreadExecutor();//main server thread
122 serverPool.execute(receive()); // Start listening rpc requests
124 status = State.STARTED;
125 _logger.info("Remote RPC Server started [{}]", getServerAddress());
136 public void close() {
138 if (State.STOPPED == this.getStatus()) return; //do nothing
140 if (serverPool != null)
141 serverPool.shutdown();
145 status = State.STOPPED;
146 _logger.info("Remote RPC Server stopped");
150 * Closes ZMQ Context. It tries to gracefully terminate the context. If
151 * termination takes more than 5 seconds, its forcefully shutdown.
153 private void closeZmqContext() {
154 ExecutorService exec = Executors.newSingleThreadExecutor();
155 FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
162 _logger.debug("ZMQ Context terminated gracefully!");
163 } catch (Exception e) {
164 _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
169 exec.execute(zmqTermination);
172 zmqTermination.get(5L, TimeUnit.SECONDS);
173 } catch (Exception e) {/*ignore and continue with shutdown*/}
179 * Main listener thread that spawns {@link ServerRequestHandler} as workers.
183 private Runnable receive() {
184 return new Runnable() {
188 Thread.currentThread().setName("remote-rpc-server");
189 _logger.debug("Remote RPC Server main thread starting...");
191 //socket clients connect to (frontend)
192 ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
194 //socket RequestHandlers connect to (backend)
195 ZMQ.Socket workers = context.socket(ZMQ.DEALER);
197 try (SocketPair capturePair = new SocketPair();
198 ServerRequestHandler requestHandler = new ServerRequestHandler(context,
200 HANDLER_WORKER_COUNT,
201 HANDLER_INPROC_ADDRESS,
202 getServerAddress());) {
204 handler = requestHandler;
206 clients.bind("tcp://*:" + port);
208 workers.bind(HANDLER_INPROC_ADDRESS);
209 //start worker threads
210 _logger.debug("Remote RPC Server worker threads starting...");
211 requestHandler.start();
212 //start capture thread
213 // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
214 // Connect work threads to client threads via a queue
215 ZMQ.proxy(clients, workers, null);//capturePair.getSender());
217 } catch (Exception e) {
218 _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
220 if (clients != null) clients.close();
221 if (workers != null) workers.close();
222 _logger.info("Remote RPC Server stopped");
229 * Finds IPv4 address of the local VM
230 * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
231 * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
232 * Should we use IP or hostname?
236 private String findIpAddress() {
237 Enumeration<?> e = null;
239 e = NetworkInterface.getNetworkInterfaces();
240 } catch (SocketException e1) {
241 _logger.error("Failed to get list of interfaces", e1);
244 while (e.hasMoreElements()) {
246 NetworkInterface n = (NetworkInterface) e.nextElement();
248 Enumeration<?> ee = n.getInetAddresses();
249 while (ee.hasMoreElements()) {
250 InetAddress i = (InetAddress) ee.nextElement();
251 _logger.debug("Trying address {}", i);
252 if ((i instanceof Inet4Address) && (!i.isLoopbackAddress())) {
253 String hostAddress = i.getHostAddress();
254 _logger.debug("Settled on host address {}", hostAddress);
260 _logger.error("Failed to find a suitable host address");