Merge "Adding Pax-Exam infra with a basic IT for plugin"
[netvirt.git] / ovsdb / src / main / java / org / opendaylight / ovsdb / plugin / ConnectionService.java
1 /*
2  * Copyright (C) 2013 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller
9  */
10 package org.opendaylight.ovsdb.plugin;
11
12 import io.netty.bootstrap.Bootstrap;
13 import io.netty.bootstrap.ServerBootstrap;
14 import io.netty.channel.AdaptiveRecvByteBufAllocator;
15 import io.netty.channel.Channel;
16 import io.netty.channel.ChannelFuture;
17 import io.netty.channel.ChannelHandler;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.channel.socket.SocketChannel;
23 import io.netty.channel.socket.nio.NioServerSocketChannel;
24 import io.netty.channel.socket.nio.NioSocketChannel;
25 import io.netty.handler.codec.string.StringEncoder;
26 import io.netty.handler.logging.LogLevel;
27 import io.netty.handler.logging.LoggingHandler;
28 import io.netty.util.CharsetUtil;
29
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.UnknownHostException;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.HashSet;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Set;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentMap;
41 import java.util.concurrent.ExecutionException;
42
43 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
44 import org.opendaylight.controller.sal.connection.ConnectionConstants;
45 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
46 import org.opendaylight.controller.sal.core.Node;
47 import org.opendaylight.controller.sal.core.Property;
48 import org.opendaylight.controller.sal.utils.ServiceHelper;
49 import org.opendaylight.controller.sal.utils.Status;
50 import org.opendaylight.controller.sal.utils.StatusCode;
51 import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
52 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
53 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
54 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
55 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
56 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
57 import org.opendaylight.ovsdb.lib.message.TableUpdates;
58 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
59 import org.opendaylight.ovsdb.lib.notation.OvsDBSet;
60 import org.opendaylight.ovsdb.lib.table.Bridge;
61 import org.opendaylight.ovsdb.lib.table.Controller;
62 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
63 import org.opendaylight.ovsdb.lib.table.internal.Table;
64 import org.opendaylight.ovsdb.lib.table.internal.Tables;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 import com.fasterxml.jackson.annotation.JsonInclude.Include;
69 import com.fasterxml.jackson.databind.DeserializationFeature;
70 import com.fasterxml.jackson.databind.ObjectMapper;
71 import com.google.common.util.concurrent.ListenableFuture;
72
73
74 /**
75  * Represents the openflow plugin component in charge of programming the flows
76  * the flow programming and relay them to functional modules above SAL.
77  */
78 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
79     protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
80
81     // Properties that can be set in config.ini
82     private static final String OVSDB_LISTENPORT = "ovsdb.listenPort";
83     private static final String OVSDB_AUTOCONFIGURECONTROLLER = "ovsdb.autoconfigurecontroller";
84     protected static final String OPENFLOW_10 = "1.0";
85     protected static final String OPENFLOW_13 = "1.3";
86
87     private static final Integer defaultOvsdbPort = 6640;
88     private static final boolean defaultAutoConfigureController = true;
89
90     private static Integer ovsdbListenPort = defaultOvsdbPort;
91     private static boolean autoConfigureController = defaultAutoConfigureController;
92     private ConcurrentMap<String, Connection> ovsdbConnections;
93     private List<ChannelHandler> handlers = null;
94     private InventoryServiceInternal inventoryServiceInternal;
95     private Channel serverListenChannel = null;
96
97     public InventoryServiceInternal getInventoryServiceInternal() {
98         return inventoryServiceInternal;
99     }
100
101     public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
102         this.inventoryServiceInternal = inventoryServiceInternal;
103     }
104
105     public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
106         if (this.inventoryServiceInternal == inventoryServiceInternal) {
107             this.inventoryServiceInternal = null;
108         }
109     }
110
111     public void init() {
112         ovsdbConnections = new ConcurrentHashMap<String, Connection>();
113         int listenPort = defaultOvsdbPort;
114         String portString = System.getProperty(OVSDB_LISTENPORT);
115         if (portString != null) {
116             listenPort = Integer.decode(portString).intValue();
117         }
118         ovsdbListenPort = listenPort;
119
120         // Keep the default value if the property is not set
121         if (System.getProperty(OVSDB_AUTOCONFIGURECONTROLLER) != null)
122             autoConfigureController = Boolean.getBoolean(OVSDB_AUTOCONFIGURECONTROLLER);
123     }
124
125     /**
126      * Function called by the dependency manager when at least one dependency
127      * become unsatisfied or when the component is shutting down because for
128      * example bundle is being stopped.
129      */
130     void destroy() {
131     }
132
133     /**
134      * Function called by dependency manager after "init ()" is called and after
135      * the services provided by the class are registered in the service registry
136      */
137     void start() {
138         startOvsdbManager();
139     }
140
141     /**
142      * Function called by the dependency manager before the services exported by
143      * the component are unregistered, this will be followed by a "destroy ()"
144      * calls
145      */
146     void stopping() {
147         for (Connection connection : ovsdbConnections.values()) {
148             connection.disconnect();
149         }
150         serverListenChannel.disconnect();
151     }
152
153     @Override
154     public Status disconnect(Node node) {
155         String identifier = (String) node.getID();
156         Connection connection = ovsdbConnections.get(identifier);
157         if (connection != null) {
158             ovsdbConnections.remove(identifier);
159             return connection.disconnect();
160         } else {
161             return new Status(StatusCode.NOTFOUND);
162         }
163     }
164
165     @Override
166     public Node connect(String identifier, Map<ConnectionConstants, String> params) {
167         InetAddress address;
168         Integer port;
169
170         try {
171             address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
172         } catch (Exception e) {
173             logger.error("Unable to resolve " + params.get(ConnectionConstants.ADDRESS), e);
174             return null;
175         }
176
177         try {
178             port = Integer.parseInt(params.get(ConnectionConstants.PORT));
179             if (port == 0) port = defaultOvsdbPort;
180         } catch (Exception e) {
181             port = defaultOvsdbPort;
182         }
183
184         try {
185             Bootstrap bootstrap = new Bootstrap();
186             bootstrap.group(new NioEventLoopGroup());
187             bootstrap.channel(NioSocketChannel.class);
188             bootstrap.option(ChannelOption.TCP_NODELAY, true);
189             bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
190
191             bootstrap.handler(new ChannelInitializer<SocketChannel>() {
192                 @Override
193                 public void initChannel(SocketChannel channel) throws Exception {
194                     if (handlers == null) {
195                         channel.pipeline().addLast(
196                                 //new LoggingHandler(LogLevel.INFO),
197                                 new JsonRpcDecoder(100000),
198                                 new StringEncoder(CharsetUtil.UTF_8));
199                     } else {
200                         for (ChannelHandler handler : handlers) {
201                             channel.pipeline().addLast(handler);
202                         }
203                     }
204                 }
205             });
206
207             ChannelFuture future = bootstrap.connect(address, port).sync();
208             Channel channel = future.channel();
209             return handleNewConnection(identifier, channel, this);
210         } catch (InterruptedException e) {
211             logger.error("Thread was interrupted during connect", e);
212         } catch (ExecutionException e) {
213             logger.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
214         }
215         return null;
216     }
217
218     public List<ChannelHandler> getHandlers() {
219         return handlers;
220     }
221
222     public void setHandlers(List<ChannelHandler> handlers) {
223         this.handlers = handlers;
224     }
225
226     @Override
227     public Connection getConnection(Node node) {
228         String identifier = (String) node.getID();
229         return ovsdbConnections.get(identifier);
230     }
231
232     @Override
233     public List<Node> getNodes() {
234         List<Node> nodes = new ArrayList<Node>();
235         for (Connection connection : ovsdbConnections.values()) {
236             nodes.add(connection.getNode());
237         }
238         return nodes;
239     }
240
241     @Override
242     public void notifyClusterViewChanged() {
243     }
244
245     @Override
246     public void notifyNodeDisconnectFromMaster(Node arg0) {
247     }
248
249     private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
250         Connection connection = new Connection(identifier, channel);
251         Node node = connection.getNode();
252
253         ObjectMapper objectMapper = new ObjectMapper();
254         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
255         objectMapper.setSerializationInclusion(Include.NON_NULL);
256
257         JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
258         JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
259         binderHandler.setNode(node);
260         channel.pipeline().addLast(binderHandler);
261
262         OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
263         connection.setRpc(ovsdb);
264         ovsdb.registerCallback(instance);
265         ovsdbConnections.put(identifier, connection);
266
267         ChannelConnectionHandler handler = new ChannelConnectionHandler();
268         handler.setNode(node);
269         handler.setConnectionService(this);
270         ChannelFuture closeFuture = channel.closeFuture();
271         closeFuture.addListener(handler);
272         // Keeping the Initial inventory update(s) on its own thread.
273         new Thread() {
274             Connection connection;
275             String identifier;
276
277             @Override
278             public void run() {
279                 try {
280                     initializeInventoryForNewNode(connection);
281                 } catch (InterruptedException | ExecutionException e) {
282                     logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
283                     ovsdbConnections.remove(identifier);
284                 }
285             }
286             public Thread initializeConnectionParams(String identifier, Connection connection) {
287                 this.identifier = identifier;
288                 this.connection = connection;
289                 return this;
290             }
291         }.initializeConnectionParams(identifier, connection).start();
292         return node;
293     }
294
295     public void channelClosed(Node node) throws Exception {
296         logger.info("Connection to Node : {} closed", node);
297         disconnect(node);
298         inventoryServiceInternal.removeNode(node);
299     }
300
301     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
302         Channel channel = connection.getChannel();
303         InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
304         int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
305         IPAddressProperty addressProp = new IPAddressProperty(address);
306         L4PortProperty l4Port = new L4PortProperty(port);
307         Set<Property> props = new HashSet<Property>();
308         props.add(addressProp);
309         props.add(l4Port);
310         inventoryServiceInternal.addNode(connection.getNode(), props);
311
312         List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
313         ListenableFuture<DatabaseSchema> dbSchemaF = connection.getRpc().get_schema(dbNames);
314         DatabaseSchema databaseSchema = dbSchemaF.get();
315         inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
316
317         MonitorRequestBuilder monitorReq = new MonitorRequestBuilder();
318         for (Table<?> table : Tables.getTables()) {
319             if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
320                 monitorReq.monitor(table);
321             } else {
322                 logger.debug("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
323             }
324         }
325
326         ListenableFuture<TableUpdates> monResponse = connection.getRpc().monitor(monitorReq);
327         TableUpdates updates = monResponse.get();
328         if (updates.getError() != null) {
329             logger.error("Error configuring monitor, error : {}, details : {}",
330                     updates.getError(),
331                     updates.getDetails());
332             /* FIXME: This should be cause for alarm */
333             throw new RuntimeException("Failed to setup a monitor in OVSDB");
334         }
335         UpdateNotification monitor = new UpdateNotification();
336         monitor.setUpdate(updates);
337         this.update(connection.getNode(), monitor);
338         if (autoConfigureController) {
339             this.updateOFControllers(connection.getNode());
340         }
341         inventoryServiceInternal.notifyNodeAdded(connection.getNode());
342     }
343
344     private void startOvsdbManager() {
345         new Thread() {
346             @Override
347             public void run() {
348                 ovsdbManager();
349             }
350         }.start();
351     }
352
353     private void ovsdbManager() {
354         EventLoopGroup bossGroup = new NioEventLoopGroup();
355         EventLoopGroup workerGroup = new NioEventLoopGroup();
356         try {
357             ServerBootstrap b = new ServerBootstrap();
358             b.group(bossGroup, workerGroup)
359              .channel(NioServerSocketChannel.class)
360              .option(ChannelOption.SO_BACKLOG, 100)
361              .handler(new LoggingHandler(LogLevel.INFO))
362              .childHandler(new ChannelInitializer<SocketChannel>() {
363                  @Override
364                  public void initChannel(SocketChannel channel) throws Exception {
365                      logger.debug("New Passive channel created : "+ channel.toString());
366                      InetAddress address = channel.remoteAddress().getAddress();
367                      int port = channel.remoteAddress().getPort();
368                      String identifier = address.getHostAddress()+":"+port;
369                      channel.pipeline().addLast(
370                              new LoggingHandler(LogLevel.INFO),
371                              new JsonRpcDecoder(100000),
372                              new StringEncoder(CharsetUtil.UTF_8));
373
374                      Node node = handleNewConnection(identifier, channel, ConnectionService.this);
375                      logger.debug("Connected Node : "+node.toString());
376                  }
377              });
378             b.option(ChannelOption.TCP_NODELAY, true);
379             b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
380             // Start the server.
381             ChannelFuture f = b.bind(ovsdbListenPort).sync();
382             serverListenChannel =  f.channel();
383             // Wait until the server socket is closed.
384             serverListenChannel.closeFuture().sync();
385         } catch (InterruptedException e) {
386             logger.error("Thread interrupted", e);
387         } finally {
388             // Shut down all event loops to terminate all threads.
389             bossGroup.shutdownGracefully();
390             workerGroup.shutdownGracefully();
391         }
392     }
393
394     private IClusterGlobalServices clusterServices;
395
396     public void setClusterServices(IClusterGlobalServices i) {
397         this.clusterServices = i;
398     }
399
400     public void unsetClusterServices(IClusterGlobalServices i) {
401         if (this.clusterServices == i) {
402             this.clusterServices = null;
403         }
404     }
405
406     private List<InetAddress> getControllerIPAddresses(Connection connection) {
407         List<InetAddress> controllers = null;
408         InetAddress controllerIP = null;
409
410         controllers = new ArrayList<InetAddress>();
411         String addressString = System.getProperty("ovsdb.controller.address");
412
413         if (addressString != null) {
414             try {
415                 controllerIP = InetAddress.getByName(addressString);
416                 if (controllerIP != null) {
417                     controllers.add(controllerIP);
418                     return controllers;
419                 }
420             } catch (UnknownHostException e) {
421                 logger.error("Host {} is invalid", addressString);
422             }
423         }
424
425         if (clusterServices != null) {
426             controllers = clusterServices.getClusteredControllers();
427             if (controllers != null && controllers.size() > 0) {
428                 if (controllers.size() == 1) {
429                     InetAddress controller = controllers.get(0);
430                     if (!controller.equals(InetAddress.getLoopbackAddress())) {
431                         return controllers;
432                     }
433                 } else {
434                     return controllers;
435                 }
436             }
437         }
438
439         addressString = System.getProperty("of.address");
440
441         if (addressString != null) {
442             try {
443                 controllerIP = InetAddress.getByName(addressString);
444                 if (controllerIP != null) {
445                     controllers.add(controllerIP);
446                     return controllers;
447                 }
448             } catch (UnknownHostException e) {
449                 logger.error("Host {} is invalid", addressString);
450             }
451         }
452
453         try {
454             controllerIP = ((InetSocketAddress)connection.getChannel().localAddress()).getAddress();
455             controllers.add(controllerIP);
456             return controllers;
457         } catch (Exception e) {
458             logger.debug("Invalid connection provided to getControllerIPAddresses", e);
459         }
460         return controllers;
461     }
462
463     private short getControllerOFPort() {
464         Short defaultOpenFlowPort = 6633;
465         Short openFlowPort = defaultOpenFlowPort;
466         String portString = System.getProperty("of.listenPort");
467         if (portString != null) {
468             try {
469                 openFlowPort = Short.decode(portString).shortValue();
470             } catch (NumberFormatException e) {
471                 logger.warn("Invalid port:{}, use default({})", portString,
472                         openFlowPort);
473             }
474         }
475         return openFlowPort;
476     }
477
478     @Override
479     public Boolean setOFController(Node node, String bridgeUUID) throws InterruptedException, ExecutionException {
480         Connection connection = this.getConnection(node);
481         if (connection == null) {
482             return false;
483         }
484
485         OVSDBConfigService ovsdbTable = (OVSDBConfigService)ServiceHelper.getGlobalInstance(OVSDBConfigService.class, this);
486         OvsDBSet<String> protocols = new OvsDBSet<String>();
487
488         String ofVersion = System.getProperty("ovsdb.of.version", OPENFLOW_10);
489         switch (ofVersion) {
490             case OPENFLOW_13:
491                 protocols.add("OpenFlow13");
492                 break;
493             case OPENFLOW_10:
494             default:
495                 protocols.add("OpenFlow10");
496                 break;
497         }
498
499         Bridge bridge = new Bridge();
500         bridge.setProtocols(protocols);
501         Status status = ovsdbTable.updateRow(node, Bridge.NAME.getName(), null, bridgeUUID, bridge);
502         logger.debug("Bridge {} updated to {} with Status {}", bridgeUUID, protocols.toArray()[0], status);
503
504         List<InetAddress> ofControllerAddrs = this.getControllerIPAddresses(connection);
505         short ofControllerPort = getControllerOFPort();
506         for (InetAddress ofControllerAddress : ofControllerAddrs) {
507             String newController = "tcp:"+ofControllerAddress.getHostAddress()+":"+ofControllerPort;
508             Controller controllerRow = new Controller();
509             controllerRow.setTarget(newController);
510             if (ovsdbTable != null) {
511                 ovsdbTable.insertRow(node, Controller.NAME.getName(), bridgeUUID, controllerRow);
512             }
513         }
514         return true;
515     }
516
517     private void updateOFControllers (Node node) {
518         Map<String, Table<?>> bridges = inventoryServiceInternal.getTableCache(node, Bridge.NAME.getName());
519         if (bridges == null) return;
520         for (String bridgeUUID : bridges.keySet()) {
521             try {
522                 this.setOFController(node, bridgeUUID);
523             } catch (Exception e) {
524                 logger.error("Failed updateOFControllers", e);
525             }
526         }
527     }
528
529     @Override
530     public void update(Node node, UpdateNotification updateNotification) {
531         if (updateNotification == null) return;
532         inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());
533     }
534
535     @Override
536     public void locked(Node node, List<String> ids) {
537         // TODO Auto-generated method stub
538     }
539
540     @Override
541     public void stolen(Node node, List<String> ids) {
542         // TODO Auto-generated method stub
543     }
544
545 }