Merge "Changed codec for Identityref in JSON transformation"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Predicate;
13 import com.google.common.collect.Sets;
14 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
17 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
18 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
19 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
21 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
22 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.zeromq.ZMQ;
28
29 import java.net.Inet4Address;
30 import java.net.InetAddress;
31 import java.net.NetworkInterface;
32 import java.net.SocketException;
33 import java.util.Enumeration;
34 import java.util.HashSet;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.FutureTask;
40 import java.util.concurrent.TimeUnit;
41
42 import static com.google.common.base.Preconditions.checkNotNull;
43
44 /**
45  * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
46  * so that it gets route change notifications from routing table.
47  */
48 public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
49
50   private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
51
52   private ExecutorService serverPool;
53   protected ServerRequestHandler handler;
54
55   private Set<QName> remoteServices;
56   private ProviderSession brokerSession;
57   private ZMQ.Context context;
58
59   private final RpcListener listener = new RpcListener();
60
61   private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
62   private final int HANDLER_WORKER_COUNT = 2;
63   private final int HWM = 200;//high water mark on sockets
64   private volatile State status = State.STOPPED;
65
66   private String serverAddress;
67   private int port;
68
69   private ClientImpl client;
70
71   private  RoutingTableProvider routingTableProvider;
72
73   public static enum State {
74     STARTING, STARTED, STOPPED;
75   }
76
77   public ServerImpl(int port) {
78     this.port = port;
79     this.serverAddress = new StringBuilder(findIpAddress()).
80                               append(":").
81                               append(port).
82                               toString();
83   }
84
85   public RoutingTableProvider getRoutingTableProvider() {
86     return routingTableProvider;
87   }
88
89   public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
90     this.routingTableProvider = routingTableProvider;
91   }
92
93   public ClientImpl getClient(){
94     return this.client;
95   }
96
97   public void setClient(ClientImpl client) {
98     this.client = client;
99   }
100
101   public State getStatus() {
102     return this.status;
103   }
104
105   public Optional<ServerRequestHandler> getHandler() {
106     return Optional.fromNullable(this.handler);
107   }
108
109   public void setBrokerSession(ProviderSession session) {
110     this.brokerSession = session;
111   }
112
113   public Optional<ProviderSession> getBrokerSession() {
114     return Optional.fromNullable(this.brokerSession);
115   }
116
117   public Optional<ZMQ.Context> getZmqContext() {
118     return Optional.fromNullable(this.context);
119   }
120
121   public String getServerAddress() {
122     return serverAddress;
123   }
124
125   public String getHandlerAddress() {
126     return HANDLER_INPROC_ADDRESS;
127   }
128
129   /**
130    *
131    */
132   public void start() {
133     Preconditions.checkState(State.STOPPED == this.getStatus(),
134         "Remote RPC Server is already running");
135
136     status = State.STARTING;
137     context = ZMQ.context(1);
138     remoteServices = new HashSet<QName>();//
139     serverPool = Executors.newSingleThreadExecutor();//main server thread
140     serverPool.execute(receive()); // Start listening rpc requests
141     brokerSession.addRpcRegistrationListener(listener);
142
143     announceLocalRpcs();
144
145     registerRemoteRpcs();
146
147     status = State.STARTED;
148     _logger.info("Remote RPC Server started [{}]", getServerAddress());
149   }
150
151   public void stop(){
152     close();
153   }
154
155   /**
156    *
157    */
158   @Override
159   public void close() {
160
161     if (State.STOPPED == this.getStatus()) return; //do nothing
162
163     unregisterLocalRpcs();
164
165     if (serverPool != null)
166       serverPool.shutdown();
167
168     closeZmqContext();
169
170     status = State.STOPPED;
171     _logger.info("Remote RPC Server stopped");
172   }
173
174   /**
175    * Closes ZMQ Context. It tries to gracefully terminate the context. If
176    * termination takes more than a second, its forcefully shutdown.
177    */
178   private void closeZmqContext() {
179     ExecutorService exec = Executors.newSingleThreadExecutor();
180     FutureTask zmqTermination = new FutureTask(new Runnable() {
181
182       @Override
183       public void run() {
184         try {
185           if (context != null)
186             context.term();
187           _logger.debug("ZMQ Context terminated gracefully!");
188         } catch (Exception e) {
189           _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
190         }
191       }
192     }, null);
193
194     exec.execute(zmqTermination);
195
196     try {
197       zmqTermination.get(5L, TimeUnit.SECONDS);
198     } catch (Exception e) {/*ignore and continue with shutdown*/}
199
200     exec.shutdownNow();
201   }
202
203   /**
204    * Main listener thread that spawns {@link ServerRequestHandler} as workers.
205    *
206    * @return
207    */
208   private Runnable receive() {
209     return new Runnable() {
210
211       @Override
212       public void run() {
213         Thread.currentThread().setName("remote-rpc-server");
214         _logger.debug("Remote RPC Server main thread starting...");
215
216         //socket clients connect to (frontend)
217         ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
218
219         //socket RequestHandlers connect to (backend)
220         ZMQ.Socket workers = context.socket(ZMQ.DEALER);
221
222         try (SocketPair capturePair = new SocketPair();
223              ServerRequestHandler requestHandler = new ServerRequestHandler(context,
224                  brokerSession,
225                  HANDLER_WORKER_COUNT,
226                  HANDLER_INPROC_ADDRESS,
227                  getServerAddress());) {
228
229           handler = requestHandler;
230           clients.setHWM(HWM);
231           clients.bind("tcp://*:" + port);
232           workers.setHWM(HWM);
233           workers.bind(HANDLER_INPROC_ADDRESS);
234           //start worker threads
235           _logger.debug("Remote RPC Server worker threads starting...");
236           requestHandler.start();
237           //start capture thread
238           // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
239           //  Connect work threads to client threads via a queue
240           ZMQ.proxy(clients, workers, null);//capturePair.getSender());
241
242         } catch (Exception e) {
243           _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
244         } finally {
245           if (clients != null) clients.close();
246           if (workers != null) workers.close();
247           _logger.info("Remote RPC Server stopped");
248         }
249       }
250     };
251   }
252
253   /**
254    * Register the remote RPCs from the routing table into broker
255    */
256   private void registerRemoteRpcs(){
257     Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
258
259     Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
260
261     Set<Map.Entry> remoteRoutes =
262             routingTableProvider.getRoutingTable().get().getAllRoutes();
263
264     //filter out all entries that contains local address
265     //we dont want to register local RPCs as remote
266     Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
267       public boolean apply(Map.Entry remoteRoute){
268         return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
269       }
270     };
271
272     //filter the entries created by current node
273     Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
274
275     for (Map.Entry route : filteredRemoteRoutes){
276       onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
277     }
278   }
279
280   /**
281    * Un-Register the local RPCs from the routing table
282    */
283   private void unregisterLocalRpcs(){
284     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
285     for (QName rpc : currentlySupported) {
286       listener.onRpcImplementationRemoved(rpc);
287     }
288   }
289
290   /**
291    * Publish all the locally registered RPCs in the routing table
292    */
293   private void announceLocalRpcs(){
294     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
295     for (QName rpc : currentlySupported) {
296       listener.onRpcImplementationAdded(rpc);
297     }
298   }
299
300   /**
301    * @param key
302    * @param value
303    */
304   @Override
305   public void onRouteUpdated(String key, String value) {
306     RouteIdentifierImpl rId = new RouteIdentifierImpl();
307     try {
308       _logger.debug("Updating key/value {}-{}", key, value);
309       brokerSession.addRpcImplementation(
310           (QName) rId.fromString(key).getType(), client);
311
312       //TODO: Check with Tony for routed rpc
313       //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
314     } catch (Exception e) {
315       _logger.info("Route update failed {}", e);
316     }
317   }
318
319   /**
320    * @param key
321    */
322   @Override
323   public void onRouteDeleted(String key) {
324     //TODO: Broker session needs to be updated to support this
325     throw new UnsupportedOperationException();
326   }
327
328   /**
329    * Finds IPv4 address of the local VM
330    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
331    * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
332    * Should we use IP or hostname?
333    *
334    * @return
335    */
336   private String findIpAddress() {
337     String hostAddress = null;
338     Enumeration e = null;
339     try {
340       e = NetworkInterface.getNetworkInterfaces();
341     } catch (SocketException e1) {
342       e1.printStackTrace();
343     }
344     while (e.hasMoreElements()) {
345
346       NetworkInterface n = (NetworkInterface) e.nextElement();
347
348       Enumeration ee = n.getInetAddresses();
349       while (ee.hasMoreElements()) {
350         InetAddress i = (InetAddress) ee.nextElement();
351         if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
352           hostAddress = i.getHostAddress();
353       }
354     }
355     return hostAddress;
356
357   }
358
359   /**
360    * Listener for rpc registrations
361    */
362   private class RpcListener implements RpcRegistrationListener {
363
364     @Override
365     public void onRpcImplementationAdded(QName name) {
366
367       //if the service name exists in the set, this notice
368       //has bounced back from the broker. It should be ignored
369       if (remoteServices.contains(name))
370         return;
371
372       _logger.debug("Adding registration for [{}]", name);
373       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
374       routeId.setType(name);
375
376       RoutingTable<String, String> routingTable = getRoutingTable();
377
378       try {
379         routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
380         _logger.debug("Route added [{}-{}]", name, getServerAddress());
381
382       } catch (RoutingTableException | SystemException e) {
383         //TODO: This can be thrown when route already exists in the table. Broker
384         //needs to handle this.
385         _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
386
387       }
388     }
389
390     @Override
391     public void onRpcImplementationRemoved(QName name) {
392
393       _logger.debug("Removing registration for [{}]", name);
394       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
395       routeId.setType(name);
396
397       RoutingTable<String, String> routingTable = getRoutingTable();
398
399       try {
400         routingTable.removeGlobalRoute(routeId.toString());
401       } catch (RoutingTableException | SystemException e) {
402         _logger.error("Route delete failed {}", e);
403       }
404     }
405
406     private RoutingTable<String, String> getRoutingTable(){
407       Optional<RoutingTable<String, String>> routingTable =
408           routingTableProvider.getRoutingTable();
409
410       checkNotNull(routingTable.isPresent(), "Routing table is null");
411
412       return routingTable.get();
413     }
414   }
415
416   /*
417    * Listener for Route changes in broker. Broker notifies this listener in the event
418    * of any change (add/delete). Listener then updates the routing table.
419    */
420   private class BrokerRouteChangeListener
421       implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
422
423     @Override
424     public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
425
426     }
427   }
428
429 }