2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Predicate;
13 import com.google.common.collect.Sets;
14 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
17 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
18 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
19 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
21 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
22 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
23 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
24 import org.opendaylight.yangtools.yang.common.QName;
25 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.zeromq.ZMQ;
30 import java.net.Inet4Address;
31 import java.net.InetAddress;
32 import java.net.NetworkInterface;
33 import java.net.SocketException;
34 import java.util.Enumeration;
35 import java.util.HashSet;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.FutureTask;
41 import java.util.concurrent.TimeUnit;
43 import static com.google.common.base.Preconditions.checkNotNull;
44 import static com.google.common.base.Preconditions.checkState;
47 * ZeroMq based implementation of RpcRouter.
49 public class ServerImpl implements RemoteRpcServer {
51 private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
53 private ExecutorService serverPool;
54 protected ServerRequestHandler handler;
56 private Set<QName> remoteServices;
57 private ProviderSession brokerSession;
58 private ZMQ.Context context;
60 private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
61 private final int HANDLER_WORKER_COUNT = 2;
62 private final int HWM = 200;//high water mark on sockets
63 private volatile State status = State.STOPPED;
65 private String serverAddress;
68 public static enum State {
69 STARTING, STARTED, STOPPED;
72 public ServerImpl(int port) {
76 public State getStatus() {
80 public Optional<ServerRequestHandler> getHandler() {
81 return Optional.fromNullable(this.handler);
84 public void setBrokerSession(ProviderSession session) {
85 this.brokerSession = session;
88 public Optional<ProviderSession> getBrokerSession() {
89 return Optional.fromNullable(this.brokerSession);
92 public Optional<ZMQ.Context> getZmqContext() {
93 return Optional.fromNullable(this.context);
96 public String getServerAddress() {
100 public String getHandlerAddress() {
101 return HANDLER_INPROC_ADDRESS;
107 public void start() {
108 Preconditions.checkState(State.STOPPED == this.getStatus(),
109 "Remote RPC Server is already running");
111 status = State.STARTING;
112 _logger.debug("Remote RPC Server is starting...");
114 String hostIpAddress = findIpAddress();
116 //Log and silently die as per discussion in the bug (bug-362)
117 //https://bugs.opendaylight.org/show_bug.cgi?id=362
119 // A tracking enhancement defect (bug-366) is created to properly fix this issue
120 //https://bugs.opendaylight.org/show_bug.cgi?id=366
121 //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address");
123 if (hostIpAddress == null) {
124 _logger.error("Remote RPC Server could not acquire host ip address. Stopping...");
129 this.serverAddress = new StringBuilder(hostIpAddress).
134 context = ZMQ.context(1);
135 remoteServices = new HashSet<QName>();//
136 serverPool = Executors.newSingleThreadExecutor();//main server thread
137 serverPool.execute(receive()); // Start listening rpc requests
139 status = State.STARTED;
140 _logger.info("Remote RPC Server started [{}]", getServerAddress());
151 public void close() {
153 if (State.STOPPED == this.getStatus()) return; //do nothing
155 if (serverPool != null)
156 serverPool.shutdown();
160 status = State.STOPPED;
161 _logger.info("Remote RPC Server stopped");
165 * Closes ZMQ Context. It tries to gracefully terminate the context. If
166 * termination takes more than 5 seconds, its forcefully shutdown.
168 private void closeZmqContext() {
169 ExecutorService exec = Executors.newSingleThreadExecutor();
170 FutureTask zmqTermination = new FutureTask(new Runnable() {
177 _logger.debug("ZMQ Context terminated gracefully!");
178 } catch (Exception e) {
179 _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
184 exec.execute(zmqTermination);
187 zmqTermination.get(5L, TimeUnit.SECONDS);
188 } catch (Exception e) {/*ignore and continue with shutdown*/}
194 * Main listener thread that spawns {@link ServerRequestHandler} as workers.
198 private Runnable receive() {
199 return new Runnable() {
203 Thread.currentThread().setName("remote-rpc-server");
204 _logger.debug("Remote RPC Server main thread starting...");
206 //socket clients connect to (frontend)
207 ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
209 //socket RequestHandlers connect to (backend)
210 ZMQ.Socket workers = context.socket(ZMQ.DEALER);
212 try (SocketPair capturePair = new SocketPair();
213 ServerRequestHandler requestHandler = new ServerRequestHandler(context,
215 HANDLER_WORKER_COUNT,
216 HANDLER_INPROC_ADDRESS,
217 getServerAddress());) {
219 handler = requestHandler;
221 clients.bind("tcp://*:" + port);
223 workers.bind(HANDLER_INPROC_ADDRESS);
224 //start worker threads
225 _logger.debug("Remote RPC Server worker threads starting...");
226 requestHandler.start();
227 //start capture thread
228 // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
229 // Connect work threads to client threads via a queue
230 ZMQ.proxy(clients, workers, null);//capturePair.getSender());
232 } catch (Exception e) {
233 _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
235 if (clients != null) clients.close();
236 if (workers != null) workers.close();
237 _logger.info("Remote RPC Server stopped");
244 * Finds IPv4 address of the local VM
245 * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
246 * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
247 * Should we use IP or hostname?
251 private String findIpAddress() {
252 Enumeration e = null;
254 e = NetworkInterface.getNetworkInterfaces();
255 } catch (SocketException e1) {
256 _logger.error("Failed to get list of interfaces", e1);
257 //throw new RuntimeException("Failed to acquire list of interfaces", e1);
260 while (e.hasMoreElements()) {
262 NetworkInterface n = (NetworkInterface) e.nextElement();
264 Enumeration ee = n.getInetAddresses();
265 while (ee.hasMoreElements()) {
266 InetAddress i = (InetAddress) ee.nextElement();
267 _logger.debug("Trying address {}", i);
268 if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) {
269 String hostAddress = i.getHostAddress();
270 _logger.debug("Settled on host address {}", hostAddress);
276 _logger.error("Failed to find a suitable host address");