2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Predicate;
13 import com.google.common.collect.Sets;
14 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
17 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
18 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
19 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
21 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
22 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.zeromq.ZMQ;
29 import java.net.Inet4Address;
30 import java.net.InetAddress;
31 import java.net.NetworkInterface;
32 import java.net.SocketException;
33 import java.util.Enumeration;
34 import java.util.HashSet;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.FutureTask;
40 import java.util.concurrent.TimeUnit;
42 import static com.google.common.base.Preconditions.checkNotNull;
45 * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
46 * so that it gets route change notifications from routing table.
48 public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
50 private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
52 private ExecutorService serverPool;
53 protected ServerRequestHandler handler;
55 private Set<QName> remoteServices;
56 private ProviderSession brokerSession;
57 private ZMQ.Context context;
59 private final RpcListener listener = new RpcListener();
61 private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
62 private final int HANDLER_WORKER_COUNT = 2;
63 private final int HWM = 200;//high water mark on sockets
64 private volatile State status = State.STOPPED;
66 private String serverAddress;
69 private ClientImpl client;
71 private RoutingTableProvider routingTableProvider;
73 public static enum State {
74 STARTING, STARTED, STOPPED;
77 public ServerImpl(int port) {
79 this.serverAddress = new StringBuilder(findIpAddress()).
85 public RoutingTableProvider getRoutingTableProvider() {
86 return routingTableProvider;
89 public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
90 this.routingTableProvider = routingTableProvider;
93 public ClientImpl getClient(){
97 public void setClient(ClientImpl client) {
101 public State getStatus() {
105 public Optional<ServerRequestHandler> getHandler() {
106 return Optional.fromNullable(this.handler);
109 public void setBrokerSession(ProviderSession session) {
110 this.brokerSession = session;
113 public Optional<ProviderSession> getBrokerSession() {
114 return Optional.fromNullable(this.brokerSession);
117 public Optional<ZMQ.Context> getZmqContext() {
118 return Optional.fromNullable(this.context);
121 public String getServerAddress() {
122 return serverAddress;
125 public String getHandlerAddress() {
126 return HANDLER_INPROC_ADDRESS;
132 public void start() {
133 Preconditions.checkState(State.STOPPED == this.getStatus(),
134 "Remote RPC Server is already running");
136 status = State.STARTING;
137 context = ZMQ.context(1);
138 remoteServices = new HashSet<QName>();//
139 serverPool = Executors.newSingleThreadExecutor();//main server thread
140 serverPool.execute(receive()); // Start listening rpc requests
141 brokerSession.addRpcRegistrationListener(listener);
145 registerRemoteRpcs();
147 status = State.STARTED;
148 _logger.info("Remote RPC Server started [{}]", getServerAddress());
159 public void close() {
161 if (State.STOPPED == this.getStatus()) return; //do nothing
163 unregisterLocalRpcs();
165 if (serverPool != null)
166 serverPool.shutdown();
170 status = State.STOPPED;
171 _logger.info("Remote RPC Server stopped");
175 * Closes ZMQ Context. It tries to gracefully terminate the context. If
176 * termination takes more than a second, its forcefully shutdown.
178 private void closeZmqContext() {
179 ExecutorService exec = Executors.newSingleThreadExecutor();
180 FutureTask zmqTermination = new FutureTask(new Runnable() {
187 _logger.debug("ZMQ Context terminated gracefully!");
188 } catch (Exception e) {
189 _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
194 exec.execute(zmqTermination);
197 zmqTermination.get(5L, TimeUnit.SECONDS);
198 } catch (Exception e) {/*ignore and continue with shutdown*/}
204 * Main listener thread that spawns {@link ServerRequestHandler} as workers.
208 private Runnable receive() {
209 return new Runnable() {
213 Thread.currentThread().setName("remote-rpc-server");
214 _logger.debug("Remote RPC Server main thread starting...");
216 //socket clients connect to (frontend)
217 ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
219 //socket RequestHandlers connect to (backend)
220 ZMQ.Socket workers = context.socket(ZMQ.DEALER);
222 try (SocketPair capturePair = new SocketPair();
223 ServerRequestHandler requestHandler = new ServerRequestHandler(context,
225 HANDLER_WORKER_COUNT,
226 HANDLER_INPROC_ADDRESS,
227 getServerAddress());) {
229 handler = requestHandler;
231 clients.bind("tcp://*:" + port);
233 workers.bind(HANDLER_INPROC_ADDRESS);
234 //start worker threads
235 _logger.debug("Remote RPC Server worker threads starting...");
236 requestHandler.start();
237 //start capture thread
238 // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
239 // Connect work threads to client threads via a queue
240 ZMQ.proxy(clients, workers, null);//capturePair.getSender());
242 } catch (Exception e) {
243 _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
245 if (clients != null) clients.close();
246 if (workers != null) workers.close();
247 _logger.info("Remote RPC Server stopped");
254 * Register the remote RPCs from the routing table into broker
256 private void registerRemoteRpcs(){
257 Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
259 Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
261 Set<Map.Entry> remoteRoutes =
262 routingTableProvider.getRoutingTable().get().getAllRoutes();
264 //filter out all entries that contains local address
265 //we dont want to register local RPCs as remote
266 Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
267 public boolean apply(Map.Entry remoteRoute){
268 return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
272 //filter the entries created by current node
273 Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
275 for (Map.Entry route : filteredRemoteRoutes){
276 onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
281 * Un-Register the local RPCs from the routing table
283 private void unregisterLocalRpcs(){
284 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
285 for (QName rpc : currentlySupported) {
286 listener.onRpcImplementationRemoved(rpc);
291 * Publish all the locally registered RPCs in the routing table
293 private void announceLocalRpcs(){
294 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
295 for (QName rpc : currentlySupported) {
296 listener.onRpcImplementationAdded(rpc);
305 public void onRouteUpdated(String key, String value) {
306 RouteIdentifierImpl rId = new RouteIdentifierImpl();
308 _logger.debug("Updating key/value {}-{}", key, value);
309 brokerSession.addRpcImplementation(
310 (QName) rId.fromString(key).getType(), client);
312 //TODO: Check with Tony for routed rpc
313 //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
314 } catch (Exception e) {
315 _logger.info("Route update failed {}", e);
323 public void onRouteDeleted(String key) {
324 //TODO: Broker session needs to be updated to support this
325 throw new UnsupportedOperationException();
329 * Finds IPv4 address of the local VM
330 * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
331 * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
332 * Should we use IP or hostname?
336 private String findIpAddress() {
337 String hostAddress = null;
338 Enumeration e = null;
340 e = NetworkInterface.getNetworkInterfaces();
341 } catch (SocketException e1) {
342 e1.printStackTrace();
344 while (e.hasMoreElements()) {
346 NetworkInterface n = (NetworkInterface) e.nextElement();
348 Enumeration ee = n.getInetAddresses();
349 while (ee.hasMoreElements()) {
350 InetAddress i = (InetAddress) ee.nextElement();
351 if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
352 hostAddress = i.getHostAddress();
360 * Listener for rpc registrations
362 private class RpcListener implements RpcRegistrationListener {
365 public void onRpcImplementationAdded(QName name) {
367 //if the service name exists in the set, this notice
368 //has bounced back from the broker. It should be ignored
369 if (remoteServices.contains(name))
372 _logger.debug("Adding registration for [{}]", name);
373 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
374 routeId.setType(name);
376 RoutingTable<String, String> routingTable = getRoutingTable();
379 routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
380 _logger.debug("Route added [{}-{}]", name, getServerAddress());
382 } catch (RoutingTableException | SystemException e) {
383 //TODO: This can be thrown when route already exists in the table. Broker
384 //needs to handle this.
385 _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
391 public void onRpcImplementationRemoved(QName name) {
393 _logger.debug("Removing registration for [{}]", name);
394 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
395 routeId.setType(name);
397 RoutingTable<String, String> routingTable = getRoutingTable();
400 routingTable.removeGlobalRoute(routeId.toString());
401 } catch (RoutingTableException | SystemException e) {
402 _logger.error("Route delete failed {}", e);
406 private RoutingTable<String, String> getRoutingTable(){
407 Optional<RoutingTable<String, String>> routingTable =
408 routingTableProvider.getRoutingTable();
410 checkNotNull(routingTable.isPresent(), "Routing table is null");
412 return routingTable.get();
417 * Listener for Route changes in broker. Broker notifies this listener in the event
418 * of any change (add/delete). Listener then updates the routing table.
420 private class BrokerRouteChangeListener
421 implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
424 public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {