Updating the commons parent pom to 1.4.1-SNAPSHOT and checkstyle fixes
[netvirt.git] / ovsdb / src / main / java / org / opendaylight / ovsdb / plugin / ConnectionService.java
1 /*
2  * Copyright (C) 2013 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller
9  */
10 package org.opendaylight.ovsdb.plugin;
11
12 import io.netty.bootstrap.Bootstrap;
13 import io.netty.bootstrap.ServerBootstrap;
14 import io.netty.channel.AdaptiveRecvByteBufAllocator;
15 import io.netty.channel.Channel;
16 import io.netty.channel.ChannelFuture;
17 import io.netty.channel.ChannelHandler;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.channel.socket.SocketChannel;
23 import io.netty.channel.socket.nio.NioServerSocketChannel;
24 import io.netty.channel.socket.nio.NioSocketChannel;
25 import io.netty.handler.codec.string.StringEncoder;
26 import io.netty.handler.logging.LogLevel;
27 import io.netty.handler.logging.LoggingHandler;
28 import io.netty.util.CharsetUtil;
29
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.UnknownHostException;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.HashSet;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Set;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentMap;
41 import java.util.concurrent.ExecutionException;
42
43 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
44 import org.opendaylight.controller.sal.connection.ConnectionConstants;
45 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
46 import org.opendaylight.controller.sal.core.Node;
47 import org.opendaylight.controller.sal.core.Property;
48 import org.opendaylight.controller.sal.utils.ServiceHelper;
49 import org.opendaylight.controller.sal.utils.Status;
50 import org.opendaylight.controller.sal.utils.StatusCode;
51 import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
52 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
53 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
54 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
55 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
56 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
57 import org.opendaylight.ovsdb.lib.message.TableUpdates;
58 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
59 import org.opendaylight.ovsdb.lib.table.Bridge;
60 import org.opendaylight.ovsdb.lib.table.Controller;
61 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
62 import org.opendaylight.ovsdb.lib.table.internal.Table;
63 import org.opendaylight.ovsdb.lib.table.internal.Tables;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 import com.fasterxml.jackson.annotation.JsonInclude.Include;
68 import com.fasterxml.jackson.databind.DeserializationFeature;
69 import com.fasterxml.jackson.databind.ObjectMapper;
70 import com.google.common.util.concurrent.ListenableFuture;
71
72
73 /**
74  * Represents the openflow plugin component in charge of programming the flows
75  * the flow programming and relay them to functional modules above SAL.
76  */
77 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
78     protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
79
80     private static final Integer defaultOvsdbPort = 6640;
81     private static Integer ovsdbListenPort = defaultOvsdbPort;
82     private ConcurrentMap<String, Connection> ovsdbConnections;
83     private List<ChannelHandler> handlers = null;
84     private InventoryServiceInternal inventoryServiceInternal;
85     private Channel serverListenChannel = null;
86
87     public InventoryServiceInternal getInventoryServiceInternal() {
88         return inventoryServiceInternal;
89     }
90
91     public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
92         this.inventoryServiceInternal = inventoryServiceInternal;
93     }
94
95     public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
96         if (this.inventoryServiceInternal == inventoryServiceInternal) {
97             this.inventoryServiceInternal = null;
98         }
99     }
100
101     public void init() {
102         ovsdbConnections = new ConcurrentHashMap<String, Connection>();
103         int listenPort = defaultOvsdbPort;
104         String portString = System.getProperty("ovsdb.listenPort");
105         if (portString != null) {
106             listenPort = Integer.decode(portString);
107         }
108         ovsdbListenPort = listenPort;
109     }
110
111     /**
112      * Function called by the dependency manager when at least one dependency
113      * become unsatisfied or when the component is shutting down because for
114      * example bundle is being stopped.
115      */
116     void destroy() {
117     }
118
119     /**
120      * Function called by dependency manager after "init ()" is called and after
121      * the services provided by the class are registered in the service registry
122      */
123     void start() {
124         startOvsdbManager();
125     }
126
127     /**
128      * Function called by the dependency manager before the services exported by
129      * the component are unregistered, this will be followed by a "destroy ()"
130      * calls
131      */
132     void stopping() {
133         for (Connection connection : ovsdbConnections.values()) {
134             connection.disconnect();
135         }
136         serverListenChannel.disconnect();
137     }
138
139     @Override
140     public Status disconnect(Node node) {
141         String identifier = (String) node.getID();
142         Connection connection = ovsdbConnections.get(identifier);
143         if (connection != null) {
144             ovsdbConnections.remove(identifier);
145             return connection.disconnect();
146         } else {
147             return new Status(StatusCode.NOTFOUND);
148         }
149     }
150
151     @Override
152     public Node connect(String identifier, Map<ConnectionConstants, String> params) {
153         InetAddress address;
154         Integer port;
155
156         try {
157             address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
158         } catch (Exception e) {
159             logger.error("Unable to resolve " + params.get(ConnectionConstants.ADDRESS), e);
160             return null;
161         }
162
163         try {
164             port = Integer.parseInt(params.get(ConnectionConstants.PORT));
165             if (port == 0) port = defaultOvsdbPort;
166         } catch (Exception e) {
167             port = defaultOvsdbPort;
168         }
169
170         try {
171             Bootstrap bootstrap = new Bootstrap();
172             bootstrap.group(new NioEventLoopGroup());
173             bootstrap.channel(NioSocketChannel.class);
174             bootstrap.option(ChannelOption.TCP_NODELAY, true);
175             bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
176
177             bootstrap.handler(new ChannelInitializer<SocketChannel>() {
178                 @Override
179                 public void initChannel(SocketChannel channel) throws Exception {
180                     if (handlers == null) {
181                         channel.pipeline().addLast(
182                                 //new LoggingHandler(LogLevel.INFO),
183                                 new JsonRpcDecoder(100000),
184                                 new StringEncoder(CharsetUtil.UTF_8));
185                     } else {
186                         for (ChannelHandler handler : handlers) {
187                             channel.pipeline().addLast(handler);
188                         }
189                     }
190                 }
191             });
192
193             ChannelFuture future = bootstrap.connect(address, port).sync();
194             Channel channel = future.channel();
195             return handleNewConnection(identifier, channel, this);
196         } catch (InterruptedException e) {
197             logger.error("Thread was interrupted during connect", e);
198         } catch (ExecutionException e) {
199             logger.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
200         }
201         return null;
202     }
203
204     public List<ChannelHandler> getHandlers() {
205         return handlers;
206     }
207
208     public void setHandlers(List<ChannelHandler> handlers) {
209         this.handlers = handlers;
210     }
211
212     @Override
213     public Connection getConnection(Node node) {
214         String identifier = (String) node.getID();
215         return ovsdbConnections.get(identifier);
216     }
217
218     @Override
219     public List<Node> getNodes() {
220         List<Node> nodes = new ArrayList<Node>();
221         for (Connection connection : ovsdbConnections.values()) {
222             nodes.add(connection.getNode());
223         }
224         return nodes;
225     }
226
227     @Override
228     public void notifyClusterViewChanged() {
229     }
230
231     @Override
232     public void notifyNodeDisconnectFromMaster(Node arg0) {
233     }
234
235     private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
236         Connection connection = new Connection(identifier, channel);
237         Node node = connection.getNode();
238
239         ObjectMapper objectMapper = new ObjectMapper();
240         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
241         objectMapper.setSerializationInclusion(Include.NON_NULL);
242
243         JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
244         JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
245         binderHandler.setNode(node);
246         channel.pipeline().addLast(binderHandler);
247
248         OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
249         connection.setRpc(ovsdb);
250         ovsdb.registerCallback(instance);
251         ovsdbConnections.put(identifier, connection);
252
253         ChannelConnectionHandler handler = new ChannelConnectionHandler();
254         handler.setNode(node);
255         handler.setConnectionService(this);
256         ChannelFuture closeFuture = channel.closeFuture();
257         closeFuture.addListener(handler);
258         // Keeping the Initial inventory update(s) on its own thread.
259         new Thread() {
260             Connection connection;
261             String identifier;
262
263             @Override
264             public void run() {
265                 try {
266                     initializeInventoryForNewNode(connection);
267                 } catch (InterruptedException | ExecutionException e) {
268                     logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
269                     ovsdbConnections.remove(identifier);
270                 }
271             }
272             public Thread initializeConnectionParams(String identifier, Connection connection) {
273                 this.identifier = identifier;
274                 this.connection = connection;
275                 return this;
276             }
277         }.initializeConnectionParams(identifier, connection).start();
278         return node;
279     }
280
281     public void channelClosed(Node node) throws Exception {
282         logger.info("Connection to Node : {} closed", node);
283         inventoryServiceInternal.removeNode(node);
284     }
285
286     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
287         Channel channel = connection.getChannel();
288         InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
289         int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
290         IPAddressProperty addressProp = new IPAddressProperty(address);
291         L4PortProperty l4Port = new L4PortProperty(port);
292         Set<Property> props = new HashSet<Property>();
293         props.add(addressProp);
294         props.add(l4Port);
295         inventoryServiceInternal.addNode(connection.getNode(), props);
296
297         List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
298         ListenableFuture<DatabaseSchema> dbSchemaF = connection.getRpc().get_schema(dbNames);
299         DatabaseSchema databaseSchema = dbSchemaF.get();
300         inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
301
302         MonitorRequestBuilder monitorReq = new MonitorRequestBuilder();
303         for (Table<?> table : Tables.getTables()) {
304             if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
305                 monitorReq.monitor(table);
306             } else {
307                 logger.warn("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
308             }
309         }
310
311         ListenableFuture<TableUpdates> monResponse = connection.getRpc().monitor(monitorReq);
312         TableUpdates updates = monResponse.get();
313         if (updates.getError() != null) {
314             logger.error("Error configuring monitor, error : {}, details : {}",
315                     updates.getError(),
316                     updates.getDetails());
317             /* FIXME: This should be cause for alarm */
318             throw new RuntimeException("Failed to setup a monitor in OVSDB");
319         }
320         UpdateNotification monitor = new UpdateNotification();
321         monitor.setUpdate(updates);
322         this.update(connection.getNode(), monitor);
323         // With the existing bridges learnt, now it is time to update the OF Controller connections.
324         this.updateOFControllers(connection.getNode());
325         inventoryServiceInternal.notifyNodeAdded(connection.getNode());
326     }
327
328     private void startOvsdbManager() {
329         new Thread() {
330             @Override
331             public void run() {
332                 ovsdbManager();
333             }
334         }.start();
335     }
336
337     private void ovsdbManager() {
338         EventLoopGroup bossGroup = new NioEventLoopGroup();
339         EventLoopGroup workerGroup = new NioEventLoopGroup();
340         try {
341             ServerBootstrap b = new ServerBootstrap();
342             b.group(bossGroup, workerGroup)
343              .channel(NioServerSocketChannel.class)
344              .option(ChannelOption.SO_BACKLOG, 100)
345              .handler(new LoggingHandler(LogLevel.INFO))
346              .childHandler(new ChannelInitializer<SocketChannel>() {
347                  @Override
348                  public void initChannel(SocketChannel channel) throws Exception {
349                      logger.debug("New Passive channel created : "+ channel.toString());
350                      InetAddress address = channel.remoteAddress().getAddress();
351                      int port = channel.remoteAddress().getPort();
352                      String identifier = address.getHostAddress()+":"+port;
353                      channel.pipeline().addLast(
354                              new LoggingHandler(LogLevel.INFO),
355                              new JsonRpcDecoder(100000),
356                              new StringEncoder(CharsetUtil.UTF_8));
357
358                      Node node = handleNewConnection(identifier, channel, ConnectionService.this);
359                      logger.debug("Connected Node : "+node.toString());
360                  }
361              });
362             b.option(ChannelOption.TCP_NODELAY, true);
363             b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
364             // Start the server.
365             ChannelFuture f = b.bind(ovsdbListenPort).sync();
366             serverListenChannel =  f.channel();
367             // Wait until the server socket is closed.
368             serverListenChannel.closeFuture().sync();
369         } catch (InterruptedException e) {
370             logger.error("Thread interrupted", e);
371         } finally {
372             // Shut down all event loops to terminate all threads.
373             bossGroup.shutdownGracefully();
374             workerGroup.shutdownGracefully();
375         }
376     }
377
378     private IClusterGlobalServices clusterServices;
379
380     public void setClusterServices(IClusterGlobalServices i) {
381         this.clusterServices = i;
382     }
383
384     public void unsetClusterServices(IClusterGlobalServices i) {
385         if (this.clusterServices == i) {
386             this.clusterServices = null;
387         }
388     }
389
390     private List<InetAddress> getControllerIPAddresses(Connection connection) {
391         List<InetAddress> controllers = null;
392         InetAddress controllerIP = null;
393
394         controllers = new ArrayList<InetAddress>();
395         String addressString = System.getProperty("ovsdb.controller.address");
396
397         if (addressString != null) {
398             try {
399                 controllerIP = InetAddress.getByName(addressString);
400                 if (controllerIP != null) {
401                     controllers.add(controllerIP);
402                     return controllers;
403                 }
404             } catch (UnknownHostException e) {
405                 logger.error("Host {} is invalid", addressString);
406             }
407         }
408
409         if (clusterServices != null) {
410             controllers = clusterServices.getClusteredControllers();
411             if (controllers != null && controllers.size() > 0) {
412                 if (controllers.size() == 1) {
413                     InetAddress controller = controllers.get(0);
414                     if (!controller.equals(InetAddress.getLoopbackAddress())) {
415                         return controllers;
416                     }
417                 } else {
418                     return controllers;
419                 }
420             }
421         }
422
423         addressString = System.getProperty("of.address");
424
425         if (addressString != null) {
426             try {
427                 controllerIP = InetAddress.getByName(addressString);
428                 if (controllerIP != null) {
429                     controllers.add(controllerIP);
430                     return controllers;
431                 }
432             } catch (UnknownHostException e) {
433                 logger.error("Host {} is invalid", addressString);
434             }
435         }
436
437         try {
438             controllerIP = ((InetSocketAddress)connection.getChannel().localAddress()).getAddress();
439             controllers.add(controllerIP);
440             return controllers;
441         } catch (Exception e) {
442             logger.debug("Invalid connection provided to getControllerIPAddresses", e);
443         }
444         return controllers;
445     }
446
447     private short getControllerOFPort() {
448         Short defaultOpenFlowPort = 6633;
449         Short openFlowPort = defaultOpenFlowPort;
450         String portString = System.getProperty("of.listenPort");
451         if (portString != null) {
452             try {
453                 openFlowPort = Short.decode(portString);
454             } catch (NumberFormatException e) {
455                 logger.warn("Invalid port:{}, use default({})", portString,
456                         openFlowPort);
457             }
458         }
459         return openFlowPort;
460     }
461
462     @Override
463     public Boolean setOFController(Node node, String bridgeUUID) throws InterruptedException, ExecutionException {
464         Connection connection = this.getConnection(node);
465         if (connection == null) {
466             return false;
467         }
468
469         if (connection != null) {
470             List<InetAddress> ofControllerAddrs = this.getControllerIPAddresses(connection);
471             short ofControllerPort = getControllerOFPort();
472             for (InetAddress ofControllerAddress : ofControllerAddrs) {
473                 String newController = "tcp:"+ofControllerAddress.getHostAddress()+":"+ofControllerPort;
474                 Controller controllerRow = new Controller();
475                 controllerRow.setTarget(newController);
476                 OVSDBConfigService ovsdbTable = (OVSDBConfigService)ServiceHelper.getGlobalInstance(OVSDBConfigService.class, this);
477                 if (ovsdbTable != null) {
478                     ovsdbTable.insertRow(node, Controller.NAME.getName(), bridgeUUID, controllerRow);
479                 }
480             }
481         }
482         return true;
483     }
484
485     private void updateOFControllers (Node node) {
486         Map<String, Table<?>> bridges = inventoryServiceInternal.getTableCache(node, Bridge.NAME.getName());
487         if (bridges == null) return;
488         for (String bridgeUUID : bridges.keySet()) {
489             try {
490                 this.setOFController(node, bridgeUUID);
491             } catch (Exception e) {
492                 logger.error("Failed updateOFControllers", e);
493             }
494         }
495     }
496
497     @Override
498     public void update(Node node, UpdateNotification updateNotification) {
499         if (updateNotification == null) return;
500         inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());
501     }
502
503     @Override
504     public void locked(Node node, List<String> ids) {
505         // TODO Auto-generated method stub
506     }
507
508     @Override
509     public void stolen(Node node, List<String> ids) {
510         // TODO Auto-generated method stub
511     }
512
513 }