Merge "Fix a few eclipse-reported warnings"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Predicate;
13 import com.google.common.collect.Sets;
14 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
17 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
18 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
19 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
21 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
22 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.zeromq.ZMQ;
28
29 import java.net.Inet4Address;
30 import java.net.InetAddress;
31 import java.net.NetworkInterface;
32 import java.net.SocketException;
33 import java.util.Enumeration;
34 import java.util.HashSet;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.FutureTask;
40 import java.util.concurrent.TimeUnit;
41
42 import static com.google.common.base.Preconditions.checkNotNull;
43 import static com.google.common.base.Preconditions.checkState;
44
45 /**
46  * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
47  * so that it gets route change notifications from routing table.
48  */
49 public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
50
51   private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
52
53   private ExecutorService serverPool;
54   protected ServerRequestHandler handler;
55
56   private Set<QName> remoteServices;
57   private ProviderSession brokerSession;
58   private ZMQ.Context context;
59
60   private final RpcListener listener = new RpcListener();
61
62   private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
63   private final int HANDLER_WORKER_COUNT = 2;
64   private final int HWM = 200;//high water mark on sockets
65   private volatile State status = State.STOPPED;
66
67   private String serverAddress;
68   private int port;
69
70   private ClientImpl client;
71
72   private  RoutingTableProvider routingTableProvider;
73
74   public static enum State {
75     STARTING, STARTED, STOPPED;
76   }
77
78   public ServerImpl(int port) {
79     this.port = port;
80   }
81
82   public RoutingTableProvider getRoutingTableProvider() {
83     return routingTableProvider;
84   }
85
86   public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
87     this.routingTableProvider = routingTableProvider;
88   }
89
90   public ClientImpl getClient(){
91     return this.client;
92   }
93
94   public void setClient(ClientImpl client) {
95     this.client = client;
96   }
97
98   public State getStatus() {
99     return this.status;
100   }
101
102   public Optional<ServerRequestHandler> getHandler() {
103     return Optional.fromNullable(this.handler);
104   }
105
106   public void setBrokerSession(ProviderSession session) {
107     this.brokerSession = session;
108   }
109
110   public Optional<ProviderSession> getBrokerSession() {
111     return Optional.fromNullable(this.brokerSession);
112   }
113
114   public Optional<ZMQ.Context> getZmqContext() {
115     return Optional.fromNullable(this.context);
116   }
117
118   public String getServerAddress() {
119     return serverAddress;
120   }
121
122   public String getHandlerAddress() {
123     return HANDLER_INPROC_ADDRESS;
124   }
125
126   /**
127    *
128    */
129   public void start() {
130     Preconditions.checkState(State.STOPPED == this.getStatus(),
131         "Remote RPC Server is already running");
132
133     status = State.STARTING;
134     _logger.debug("Remote RPC Server is starting...");
135
136     String hostIpAddress = findIpAddress();
137
138     //Log and silently die as per discussion in the bug (bug-362)
139     //https://bugs.opendaylight.org/show_bug.cgi?id=362
140     //
141     // A tracking enhancement defect (bug-366) is created to properly fix this issue
142     //https://bugs.opendaylight.org/show_bug.cgi?id=366
143     //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address");
144
145     if (hostIpAddress == null) {
146       _logger.error("Remote RPC Server could not acquire host ip address. Stopping...");
147       stop();
148       return;
149     }
150
151     this.serverAddress = new StringBuilder(hostIpAddress).
152         append(":").
153         append(port).
154         toString();
155
156     context = ZMQ.context(1);
157     remoteServices = new HashSet<QName>();//
158     serverPool = Executors.newSingleThreadExecutor();//main server thread
159     serverPool.execute(receive()); // Start listening rpc requests
160     brokerSession.addRpcRegistrationListener(listener);
161
162     announceLocalRpcs();
163
164     registerRemoteRpcs();
165
166     status = State.STARTED;
167     _logger.info("Remote RPC Server started [{}]", getServerAddress());
168   }
169
170   public void stop(){
171     close();
172   }
173
174   /**
175    *
176    */
177   @Override
178   public void close() {
179
180     if (State.STOPPED == this.getStatus()) return; //do nothing
181
182     unregisterLocalRpcs();
183
184     if (serverPool != null)
185       serverPool.shutdown();
186
187     closeZmqContext();
188
189     status = State.STOPPED;
190     _logger.info("Remote RPC Server stopped");
191   }
192
193   /**
194    * Closes ZMQ Context. It tries to gracefully terminate the context. If
195    * termination takes more than a second, its forcefully shutdown.
196    */
197   private void closeZmqContext() {
198     ExecutorService exec = Executors.newSingleThreadExecutor();
199     FutureTask zmqTermination = new FutureTask(new Runnable() {
200
201       @Override
202       public void run() {
203         try {
204           if (context != null)
205             context.term();
206           _logger.debug("ZMQ Context terminated gracefully!");
207         } catch (Exception e) {
208           _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
209         }
210       }
211     }, null);
212
213     exec.execute(zmqTermination);
214
215     try {
216       zmqTermination.get(5L, TimeUnit.SECONDS);
217     } catch (Exception e) {/*ignore and continue with shutdown*/}
218
219     exec.shutdownNow();
220   }
221
222   /**
223    * Main listener thread that spawns {@link ServerRequestHandler} as workers.
224    *
225    * @return
226    */
227   private Runnable receive() {
228     return new Runnable() {
229
230       @Override
231       public void run() {
232         Thread.currentThread().setName("remote-rpc-server");
233         _logger.debug("Remote RPC Server main thread starting...");
234
235         //socket clients connect to (frontend)
236         ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
237
238         //socket RequestHandlers connect to (backend)
239         ZMQ.Socket workers = context.socket(ZMQ.DEALER);
240
241         try (SocketPair capturePair = new SocketPair();
242              ServerRequestHandler requestHandler = new ServerRequestHandler(context,
243                  brokerSession,
244                  HANDLER_WORKER_COUNT,
245                  HANDLER_INPROC_ADDRESS,
246                  getServerAddress());) {
247
248           handler = requestHandler;
249           clients.setHWM(HWM);
250           clients.bind("tcp://*:" + port);
251           workers.setHWM(HWM);
252           workers.bind(HANDLER_INPROC_ADDRESS);
253           //start worker threads
254           _logger.debug("Remote RPC Server worker threads starting...");
255           requestHandler.start();
256           //start capture thread
257           // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
258           //  Connect work threads to client threads via a queue
259           ZMQ.proxy(clients, workers, null);//capturePair.getSender());
260
261         } catch (Exception e) {
262           _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
263         } finally {
264           if (clients != null) clients.close();
265           if (workers != null) workers.close();
266           _logger.info("Remote RPC Server stopped");
267         }
268       }
269     };
270   }
271
272   /**
273    * Register the remote RPCs from the routing table into broker
274    */
275   private void registerRemoteRpcs(){
276     Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
277
278     Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
279
280     Set<Map.Entry> remoteRoutes =
281             routingTableProvider.getRoutingTable().get().getAllRoutes();
282
283     //filter out all entries that contains local address
284     //we dont want to register local RPCs as remote
285     Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
286       public boolean apply(Map.Entry remoteRoute){
287         return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
288       }
289     };
290
291     //filter the entries created by current node
292     Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
293
294     for (Map.Entry route : filteredRemoteRoutes){
295       onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
296     }
297   }
298
299   /**
300    * Un-Register the local RPCs from the routing table
301    */
302   private void unregisterLocalRpcs(){
303     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
304     for (QName rpc : currentlySupported) {
305       listener.onRpcImplementationRemoved(rpc);
306     }
307   }
308
309   /**
310    * Publish all the locally registered RPCs in the routing table
311    */
312   private void announceLocalRpcs(){
313     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
314     for (QName rpc : currentlySupported) {
315       listener.onRpcImplementationAdded(rpc);
316     }
317   }
318
319   /**
320    * @param key
321    * @param value
322    */
323   @Override
324   public void onRouteUpdated(String key, String value) {
325     RouteIdentifierImpl rId = new RouteIdentifierImpl();
326     try {
327       _logger.debug("Updating key/value {}-{}", key, value);
328       brokerSession.addRpcImplementation(
329           (QName) rId.fromString(key).getType(), client);
330
331       //TODO: Check with Tony for routed rpc
332       //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
333     } catch (Exception e) {
334       _logger.info("Route update failed {}", e);
335     }
336   }
337
338   /**
339    * @param key
340    */
341   @Override
342   public void onRouteDeleted(String key) {
343     //TODO: Broker session needs to be updated to support this
344     throw new UnsupportedOperationException();
345   }
346
347   /**
348    * Finds IPv4 address of the local VM
349    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
350    * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
351    * Should we use IP or hostname?
352    *
353    * @return
354    */
355   private String findIpAddress() {
356     Enumeration e = null;
357     try {
358       e = NetworkInterface.getNetworkInterfaces();
359     } catch (SocketException e1) {
360       _logger.error("Failed to get list of interfaces", e1);
361       //throw new RuntimeException("Failed to acquire list of interfaces", e1);
362       return null;
363     }
364     while (e.hasMoreElements()) {
365
366       NetworkInterface n = (NetworkInterface) e.nextElement();
367
368       Enumeration ee = n.getInetAddresses();
369       while (ee.hasMoreElements()) {
370         InetAddress i = (InetAddress) ee.nextElement();
371         _logger.debug("Trying address {}", i);
372         if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) {
373           String hostAddress = i.getHostAddress();
374           _logger.debug("Settled on host address {}", hostAddress);
375           return hostAddress;
376         }
377       }
378     }
379
380     _logger.error("Failed to find a suitable host address");
381     return null;
382   }
383
384   /**
385    * Listener for rpc registrations
386    */
387   private class RpcListener implements RpcRegistrationListener {
388
389     @Override
390     public void onRpcImplementationAdded(QName name) {
391
392       //if the service name exists in the set, this notice
393       //has bounced back from the broker. It should be ignored
394       if (remoteServices.contains(name))
395         return;
396
397       _logger.debug("Adding registration for [{}]", name);
398       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
399       routeId.setType(name);
400
401       RoutingTable<String, String> routingTable = getRoutingTable();
402
403       try {
404         routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
405         _logger.debug("Route added [{}-{}]", name, getServerAddress());
406
407       } catch (RoutingTableException | SystemException e) {
408         //TODO: This can be thrown when route already exists in the table. Broker
409         //needs to handle this.
410         _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
411
412       }
413     }
414
415     @Override
416     public void onRpcImplementationRemoved(QName name) {
417
418       _logger.debug("Removing registration for [{}]", name);
419       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
420       routeId.setType(name);
421
422       RoutingTable<String, String> routingTable = getRoutingTable();
423
424       try {
425         routingTable.removeGlobalRoute(routeId.toString());
426       } catch (RoutingTableException | SystemException e) {
427         _logger.error("Route delete failed {}", e);
428       }
429     }
430
431     private RoutingTable<String, String> getRoutingTable(){
432       Optional<RoutingTable<String, String>> routingTable =
433           routingTableProvider.getRoutingTable();
434
435       checkNotNull(routingTable.isPresent(), "Routing table is null");
436
437       return routingTable.get();
438     }
439   }
440
441   /*
442    * Listener for Route changes in broker. Broker notifies this listener in the event
443    * of any change (add/delete). Listener then updates the routing table.
444    */
445   private class BrokerRouteChangeListener
446       implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
447
448     @Override
449     public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
450
451     }
452   }
453
454 }