2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Predicate;
13 import com.google.common.collect.Sets;
14 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
17 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
18 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
19 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
21 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
22 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.zeromq.ZMQ;
29 import java.net.Inet4Address;
30 import java.net.InetAddress;
31 import java.net.NetworkInterface;
32 import java.net.SocketException;
33 import java.util.Enumeration;
34 import java.util.HashSet;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.FutureTask;
40 import java.util.concurrent.TimeUnit;
42 import static com.google.common.base.Preconditions.checkNotNull;
43 import static com.google.common.base.Preconditions.checkState;
46 * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
47 * so that it gets route change notifications from routing table.
49 public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
51 private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
53 private ExecutorService serverPool;
54 protected ServerRequestHandler handler;
56 private Set<QName> remoteServices;
57 private ProviderSession brokerSession;
58 private ZMQ.Context context;
60 private final RpcListener listener = new RpcListener();
62 private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
63 private final int HANDLER_WORKER_COUNT = 2;
64 private final int HWM = 200;//high water mark on sockets
65 private volatile State status = State.STOPPED;
67 private String serverAddress;
70 private ClientImpl client;
72 private RoutingTableProvider routingTableProvider;
74 public static enum State {
75 STARTING, STARTED, STOPPED;
78 public ServerImpl(int port) {
82 public RoutingTableProvider getRoutingTableProvider() {
83 return routingTableProvider;
86 public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
87 this.routingTableProvider = routingTableProvider;
90 public ClientImpl getClient(){
94 public void setClient(ClientImpl client) {
98 public State getStatus() {
102 public Optional<ServerRequestHandler> getHandler() {
103 return Optional.fromNullable(this.handler);
106 public void setBrokerSession(ProviderSession session) {
107 this.brokerSession = session;
110 public Optional<ProviderSession> getBrokerSession() {
111 return Optional.fromNullable(this.brokerSession);
114 public Optional<ZMQ.Context> getZmqContext() {
115 return Optional.fromNullable(this.context);
118 public String getServerAddress() {
119 return serverAddress;
122 public String getHandlerAddress() {
123 return HANDLER_INPROC_ADDRESS;
129 public void start() {
130 Preconditions.checkState(State.STOPPED == this.getStatus(),
131 "Remote RPC Server is already running");
133 status = State.STARTING;
134 _logger.debug("Remote RPC Server is starting...");
136 String hostIpAddress = findIpAddress();
138 //Log and silently die as per discussion in the bug (bug-362)
139 //https://bugs.opendaylight.org/show_bug.cgi?id=362
141 // A tracking enhancement defect (bug-366) is created to properly fix this issue
142 //https://bugs.opendaylight.org/show_bug.cgi?id=366
143 //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address");
145 if (hostIpAddress == null) {
146 _logger.error("Remote RPC Server could not acquire host ip address. Stopping...");
151 this.serverAddress = new StringBuilder(hostIpAddress).
156 context = ZMQ.context(1);
157 remoteServices = new HashSet<QName>();//
158 serverPool = Executors.newSingleThreadExecutor();//main server thread
159 serverPool.execute(receive()); // Start listening rpc requests
160 brokerSession.addRpcRegistrationListener(listener);
164 registerRemoteRpcs();
166 status = State.STARTED;
167 _logger.info("Remote RPC Server started [{}]", getServerAddress());
178 public void close() {
180 if (State.STOPPED == this.getStatus()) return; //do nothing
182 unregisterLocalRpcs();
184 if (serverPool != null)
185 serverPool.shutdown();
189 status = State.STOPPED;
190 _logger.info("Remote RPC Server stopped");
194 * Closes ZMQ Context. It tries to gracefully terminate the context. If
195 * termination takes more than a second, its forcefully shutdown.
197 private void closeZmqContext() {
198 ExecutorService exec = Executors.newSingleThreadExecutor();
199 FutureTask zmqTermination = new FutureTask(new Runnable() {
206 _logger.debug("ZMQ Context terminated gracefully!");
207 } catch (Exception e) {
208 _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
213 exec.execute(zmqTermination);
216 zmqTermination.get(5L, TimeUnit.SECONDS);
217 } catch (Exception e) {/*ignore and continue with shutdown*/}
223 * Main listener thread that spawns {@link ServerRequestHandler} as workers.
227 private Runnable receive() {
228 return new Runnable() {
232 Thread.currentThread().setName("remote-rpc-server");
233 _logger.debug("Remote RPC Server main thread starting...");
235 //socket clients connect to (frontend)
236 ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
238 //socket RequestHandlers connect to (backend)
239 ZMQ.Socket workers = context.socket(ZMQ.DEALER);
241 try (SocketPair capturePair = new SocketPair();
242 ServerRequestHandler requestHandler = new ServerRequestHandler(context,
244 HANDLER_WORKER_COUNT,
245 HANDLER_INPROC_ADDRESS,
246 getServerAddress());) {
248 handler = requestHandler;
250 clients.bind("tcp://*:" + port);
252 workers.bind(HANDLER_INPROC_ADDRESS);
253 //start worker threads
254 _logger.debug("Remote RPC Server worker threads starting...");
255 requestHandler.start();
256 //start capture thread
257 // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
258 // Connect work threads to client threads via a queue
259 ZMQ.proxy(clients, workers, null);//capturePair.getSender());
261 } catch (Exception e) {
262 _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
264 if (clients != null) clients.close();
265 if (workers != null) workers.close();
266 _logger.info("Remote RPC Server stopped");
273 * Register the remote RPCs from the routing table into broker
275 private void registerRemoteRpcs(){
276 Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
278 Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
280 Set<Map.Entry> remoteRoutes =
281 routingTableProvider.getRoutingTable().get().getAllRoutes();
283 //filter out all entries that contains local address
284 //we dont want to register local RPCs as remote
285 Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
286 public boolean apply(Map.Entry remoteRoute){
287 return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
291 //filter the entries created by current node
292 Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
294 for (Map.Entry route : filteredRemoteRoutes){
295 onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
300 * Un-Register the local RPCs from the routing table
302 private void unregisterLocalRpcs(){
303 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
304 for (QName rpc : currentlySupported) {
305 listener.onRpcImplementationRemoved(rpc);
310 * Publish all the locally registered RPCs in the routing table
312 private void announceLocalRpcs(){
313 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
314 for (QName rpc : currentlySupported) {
315 listener.onRpcImplementationAdded(rpc);
324 public void onRouteUpdated(String key, String value) {
325 RouteIdentifierImpl rId = new RouteIdentifierImpl();
327 _logger.debug("Updating key/value {}-{}", key, value);
328 brokerSession.addRpcImplementation(
329 (QName) rId.fromString(key).getType(), client);
331 //TODO: Check with Tony for routed rpc
332 //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
333 } catch (Exception e) {
334 _logger.info("Route update failed {}", e);
342 public void onRouteDeleted(String key) {
343 //TODO: Broker session needs to be updated to support this
344 throw new UnsupportedOperationException();
348 * Finds IPv4 address of the local VM
349 * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
350 * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
351 * Should we use IP or hostname?
355 private String findIpAddress() {
356 Enumeration e = null;
358 e = NetworkInterface.getNetworkInterfaces();
359 } catch (SocketException e1) {
360 _logger.error("Failed to get list of interfaces", e1);
361 //throw new RuntimeException("Failed to acquire list of interfaces", e1);
364 while (e.hasMoreElements()) {
366 NetworkInterface n = (NetworkInterface) e.nextElement();
368 Enumeration ee = n.getInetAddresses();
369 while (ee.hasMoreElements()) {
370 InetAddress i = (InetAddress) ee.nextElement();
371 _logger.debug("Trying address {}", i);
372 if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) {
373 String hostAddress = i.getHostAddress();
374 _logger.debug("Settled on host address {}", hostAddress);
380 _logger.error("Failed to find a suitable host address");
385 * Listener for rpc registrations
387 private class RpcListener implements RpcRegistrationListener {
390 public void onRpcImplementationAdded(QName name) {
392 //if the service name exists in the set, this notice
393 //has bounced back from the broker. It should be ignored
394 if (remoteServices.contains(name))
397 _logger.debug("Adding registration for [{}]", name);
398 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
399 routeId.setType(name);
401 RoutingTable<String, String> routingTable = getRoutingTable();
404 routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
405 _logger.debug("Route added [{}-{}]", name, getServerAddress());
407 } catch (RoutingTableException | SystemException e) {
408 //TODO: This can be thrown when route already exists in the table. Broker
409 //needs to handle this.
410 _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
416 public void onRpcImplementationRemoved(QName name) {
418 _logger.debug("Removing registration for [{}]", name);
419 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
420 routeId.setType(name);
422 RoutingTable<String, String> routingTable = getRoutingTable();
425 routingTable.removeGlobalRoute(routeId.toString());
426 } catch (RoutingTableException | SystemException e) {
427 _logger.error("Route delete failed {}", e);
431 private RoutingTable<String, String> getRoutingTable(){
432 Optional<RoutingTable<String, String>> routingTable =
433 routingTableProvider.getRoutingTable();
435 checkNotNull(routingTable.isPresent(), "Routing table is null");
437 return routingTable.get();
442 * Listener for Route changes in broker. Broker notifies this listener in the event
443 * of any change (add/delete). Listener then updates the routing table.
445 private class BrokerRouteChangeListener
446 implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
449 public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {