00deaa6514a15036637a91c62819595e6d86736a
[netvirt.git] / ovsdb / src / main / java / org / opendaylight / ovsdb / plugin / ConnectionService.java
1 package org.opendaylight.ovsdb.plugin;
2
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.bootstrap.ServerBootstrap;
5 import io.netty.channel.AdaptiveRecvByteBufAllocator;
6 import io.netty.channel.Channel;
7 import io.netty.channel.ChannelFuture;
8 import io.netty.channel.ChannelHandler;
9 import io.netty.channel.ChannelInitializer;
10 import io.netty.channel.ChannelOption;
11 import io.netty.channel.EventLoopGroup;
12 import io.netty.channel.nio.NioEventLoopGroup;
13 import io.netty.channel.socket.SocketChannel;
14 import io.netty.channel.socket.nio.NioServerSocketChannel;
15 import io.netty.channel.socket.nio.NioSocketChannel;
16 import io.netty.handler.codec.string.StringEncoder;
17 import io.netty.handler.logging.LogLevel;
18 import io.netty.handler.logging.LoggingHandler;
19 import io.netty.util.CharsetUtil;
20
21 import java.net.InetAddress;
22 import java.net.InetSocketAddress;
23 import java.net.NetworkInterface;
24 import java.net.SocketException;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.Enumeration;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ExecutionException;
36
37 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
38 import org.opendaylight.controller.sal.connection.ConnectionConstants;
39 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.utils.NetUtils;
43 import org.opendaylight.controller.sal.utils.ServiceHelper;
44 import org.opendaylight.controller.sal.utils.Status;
45 import org.opendaylight.controller.sal.utils.StatusCode;
46 import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
47 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
48 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
49 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
50 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
51 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
52 import org.opendaylight.ovsdb.lib.message.TableUpdates;
53 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
54 import org.opendaylight.ovsdb.lib.table.Bridge;
55 import org.opendaylight.ovsdb.lib.table.Controller;
56 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
57 import org.opendaylight.ovsdb.lib.table.internal.Table;
58 import org.opendaylight.ovsdb.lib.table.internal.Tables;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 import com.fasterxml.jackson.annotation.JsonInclude.Include;
63 import com.fasterxml.jackson.databind.DeserializationFeature;
64 import com.fasterxml.jackson.databind.ObjectMapper;
65 import com.google.common.util.concurrent.ListenableFuture;
66
67
68 /**
69  * Represents the openflow plugin component in charge of programming the flows
70  * the flow programming and relay them to functional modules above SAL.
71  */
72 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
73     protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
74
75     private static final Integer defaultOvsdbPort = 6640;
76     private static Integer ovsdbListenPort = defaultOvsdbPort;
77     private ConcurrentMap<String, Connection> ovsdbConnections;
78     private List<ChannelHandler> handlers = null;
79     private InventoryServiceInternal inventoryServiceInternal;
80     private Channel serverListenChannel = null;
81
82     public InventoryServiceInternal getInventoryServiceInternal() {
83         return inventoryServiceInternal;
84     }
85
86     public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
87         this.inventoryServiceInternal = inventoryServiceInternal;
88     }
89
90     public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
91         if (this.inventoryServiceInternal == inventoryServiceInternal) {
92             this.inventoryServiceInternal = null;
93         }
94     }
95
96     public void init() {
97         ovsdbConnections = new ConcurrentHashMap<String, Connection>();
98         int listenPort = defaultOvsdbPort;
99         String portString = System.getProperty("ovsdb.listenPort");
100         if (portString != null) {
101             listenPort = Integer.decode(portString).intValue();
102         }
103         ovsdbListenPort = listenPort;
104     }
105
106     /**
107      * Function called by the dependency manager when at least one dependency
108      * become unsatisfied or when the component is shutting down because for
109      * example bundle is being stopped.
110      */
111     void destroy() {
112     }
113
114     /**
115      * Function called by dependency manager after "init ()" is called and after
116      * the services provided by the class are registered in the service registry
117      */
118     void start() {
119         startOvsdbManager();
120     }
121
122     /**
123      * Function called by the dependency manager before the services exported by
124      * the component are unregistered, this will be followed by a "destroy ()"
125      * calls
126      */
127     void stopping() {
128         for (Connection connection : ovsdbConnections.values()) {
129             connection.disconnect();
130         }
131         serverListenChannel.disconnect();
132     }
133
134     @Override
135     public Status disconnect(Node node) {
136         String identifier = (String) node.getID();
137         Connection connection = ovsdbConnections.get(identifier);
138         if (connection != null) {
139             ovsdbConnections.remove(identifier);
140             return connection.disconnect();
141         } else {
142             return new Status(StatusCode.NOTFOUND);
143         }
144     }
145
146     @Override
147     public Node connect(String identifier, Map<ConnectionConstants, String> params) {
148         InetAddress address;
149         Integer port;
150
151         try {
152             address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
153         } catch (Exception e) {
154             e.printStackTrace();
155             return null;
156         }
157
158         try {
159             port = Integer.parseInt(params.get(ConnectionConstants.PORT));
160             if (port == 0) port = defaultOvsdbPort;
161         } catch (Exception e) {
162             port = defaultOvsdbPort;
163         }
164
165         try {
166             Bootstrap bootstrap = new Bootstrap();
167             bootstrap.group(new NioEventLoopGroup());
168             bootstrap.channel(NioSocketChannel.class);
169             bootstrap.option(ChannelOption.TCP_NODELAY, true);
170             bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
171
172             bootstrap.handler(new ChannelInitializer<SocketChannel>() {
173                 @Override
174                 public void initChannel(SocketChannel channel) throws Exception {
175                     if (handlers == null) {
176                         channel.pipeline().addLast(
177                                 //new LoggingHandler(LogLevel.INFO),
178                                 new JsonRpcDecoder(100000),
179                                 new StringEncoder(CharsetUtil.UTF_8));
180                     } else {
181                         for (ChannelHandler handler : handlers) {
182                             channel.pipeline().addLast(handler);
183                         }
184                     }
185                 }
186             });
187
188             ChannelFuture future = bootstrap.connect(address, port).sync();
189             Channel channel = future.channel();
190             return handleNewConnection(identifier, channel, this);
191         } catch (Exception e) {
192             e.printStackTrace();
193         }
194         return null;
195     }
196
197     public List<ChannelHandler> getHandlers() {
198         return handlers;
199     }
200
201     public void setHandlers(List<ChannelHandler> handlers) {
202         this.handlers = handlers;
203     }
204
205     @Override
206     public Connection getConnection(Node node) {
207         String identifier = (String) node.getID();
208         return ovsdbConnections.get(identifier);
209     }
210
211     @Override
212     public void notifyClusterViewChanged() {
213     }
214
215     @Override
216     public void notifyNodeDisconnectFromMaster(Node arg0) {
217     }
218
219     private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
220         Connection connection = new Connection(identifier, channel);
221         Node node = connection.getNode();
222
223         ObjectMapper objectMapper = new ObjectMapper();
224         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
225         objectMapper.setSerializationInclusion(Include.NON_NULL);
226
227         JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
228         JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
229         binderHandler.setNode(node);
230         channel.pipeline().addLast(binderHandler);
231
232         OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
233         connection.setRpc(ovsdb);
234         ovsdb.registerCallback(instance);
235         ovsdbConnections.put(identifier, connection);
236
237         ChannelConnectionHandler handler = new ChannelConnectionHandler();
238         handler.setNode(node);
239         handler.setConnectionService(this);
240         ChannelFuture closeFuture = channel.closeFuture();
241         closeFuture.addListener(handler);
242         // Keeping the Initial inventory update(s) on its own thread.
243         new Thread() {
244             Connection connection;
245             String identifier;
246
247             @Override
248             public void run() {
249                 try {
250                     initializeInventoryForNewNode(connection);
251                 } catch (Exception e) {
252                     e.printStackTrace();
253                     ovsdbConnections.remove(identifier);
254                 }
255             }
256             public Thread initializeConnectionParams(String identifier, Connection connection) {
257                 this.identifier = identifier;
258                 this.connection = connection;
259                 return this;
260             }
261         }.initializeConnectionParams(identifier, connection).start();
262         return node;
263     }
264
265     public void channelClosed(Node node) throws Exception {
266         logger.error("Connection to Node : {} closed", node);
267         inventoryServiceInternal.removeNode(node);
268     }
269
270     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
271         Channel channel = connection.getChannel();
272         InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
273         int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
274         IPAddressProperty addressProp = new IPAddressProperty(address);
275         L4PortProperty l4Port = new L4PortProperty(port);
276         Set<Property> props = new HashSet<Property>();
277         props.add(addressProp);
278         props.add(l4Port);
279         inventoryServiceInternal.addNode(connection.getNode(), props);
280
281         List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
282         ListenableFuture<DatabaseSchema> dbSchemaF = connection.getRpc().get_schema(dbNames);
283         DatabaseSchema databaseSchema = dbSchemaF.get();
284         inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
285
286         MonitorRequestBuilder monitorReq = new MonitorRequestBuilder();
287         for (Table<?> table : Tables.getTables()) {
288             if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
289                 monitorReq.monitor(table);
290             } else {
291                 logger.warn("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
292             }
293         }
294
295         ListenableFuture<TableUpdates> monResponse = connection.getRpc().monitor(monitorReq);
296         TableUpdates updates = monResponse.get();
297         if (updates.getError() != null) {
298             logger.error("Error configuring monitor, error : {}, details : {}",
299                     updates.getError(),
300                     updates.getDetails());
301             /* FIXME: This should be cause for alarm */
302             throw new RuntimeException("Failed to setup a monitor in OVSDB");
303         }
304         UpdateNotification monitor = new UpdateNotification();
305         monitor.setUpdate(updates);
306         this.update(connection.getNode(), monitor);
307         // With the existing bridges learnt, now it is time to update the OF Controller connections.
308         this.updateOFControllers(connection.getNode());
309     }
310
311     private void startOvsdbManager() {
312         new Thread() {
313             @Override
314             public void run() {
315                 ovsdbManager();
316             }
317         }.start();
318     }
319
320     private void ovsdbManager() {
321         EventLoopGroup bossGroup = new NioEventLoopGroup();
322         EventLoopGroup workerGroup = new NioEventLoopGroup();
323         try {
324             ServerBootstrap b = new ServerBootstrap();
325             b.group(bossGroup, workerGroup)
326              .channel(NioServerSocketChannel.class)
327              .option(ChannelOption.SO_BACKLOG, 100)
328              .handler(new LoggingHandler(LogLevel.INFO))
329              .childHandler(new ChannelInitializer<SocketChannel>() {
330                  @Override
331                  public void initChannel(SocketChannel channel) throws Exception {
332                      logger.debug("New Passive channel created : "+ channel.toString());
333                      InetAddress address = channel.remoteAddress().getAddress();
334                      int port = channel.remoteAddress().getPort();
335                      String identifier = address.getHostAddress()+":"+port;
336                      channel.pipeline().addLast(
337                              new LoggingHandler(LogLevel.INFO),
338                              new JsonRpcDecoder(100000),
339                              new StringEncoder(CharsetUtil.UTF_8));
340
341                      Node node = handleNewConnection(identifier, channel, ConnectionService.this);
342                      logger.debug("Connected Node : "+node.toString());
343                  }
344              });
345             b.option(ChannelOption.TCP_NODELAY, true);
346             b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
347             // Start the server.
348             ChannelFuture f = b.bind(ovsdbListenPort).sync();
349             serverListenChannel =  f.channel();
350             // Wait until the server socket is closed.
351             serverListenChannel.closeFuture().sync();
352         } catch (InterruptedException e) {
353             e.printStackTrace();
354         } finally {
355             // Shut down all event loops to terminate all threads.
356             bossGroup.shutdownGracefully();
357             workerGroup.shutdownGracefully();
358         }
359     }
360
361     private IClusterGlobalServices clusterServices;
362
363     public void setClusterServices(IClusterGlobalServices i) {
364         this.clusterServices = i;
365     }
366
367     public void unsetClusterServices(IClusterGlobalServices i) {
368         if (this.clusterServices == i) {
369             this.clusterServices = null;
370         }
371     }
372
373     private List<InetAddress> getControllerIPAddresses() {
374         List<InetAddress> controllers = null;
375         if (clusterServices != null) {
376             controllers = clusterServices.getClusteredControllers();
377             if (controllers != null && controllers.size() > 0) {
378                 if (controllers.size() == 1) {
379                     InetAddress controller = controllers.get(0);
380                     if (!controller.equals(InetAddress.getLoopbackAddress())) {
381                         return controllers;
382                     }
383                 } else {
384                     return controllers;
385                 }
386             }
387         }
388
389         controllers = new ArrayList<InetAddress>();
390         String addressString = System.getProperty("ovsdb.controller.address");
391         if (addressString == null) addressString = System.getProperty("of.address");
392
393         if (addressString != null) {
394             InetAddress controllerIP = null;
395             try {
396                 controllerIP = InetAddress.getByName(addressString);
397                 if (controllerIP != null) {
398                     controllers.add(controllerIP);
399                     return controllers;
400                 }
401             } catch (Exception e) {
402                 logger.debug("Invalid IP: {}, use wildcard *", addressString);
403             }
404         }
405
406         Enumeration<NetworkInterface> nets;
407         try {
408             nets = NetworkInterface.getNetworkInterfaces();
409             for (NetworkInterface netint : Collections.list(nets)) {
410                 Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
411                 for (InetAddress inetAddress : Collections.list(inetAddresses)) {
412                     if (!inetAddress.isLoopbackAddress() &&
413                             NetUtils.isIPv4AddressValid(inetAddress.getHostAddress())) {
414                         controllers.add(inetAddress);
415                     }
416                 }
417             }
418         } catch (SocketException e) {
419             controllers.add(InetAddress.getLoopbackAddress());
420         }
421         return controllers;
422     }
423
424     private short getControllerOFPort() {
425         Short defaultOpenFlowPort = 6633;
426         Short openFlowPort = defaultOpenFlowPort;
427         String portString = System.getProperty("of.listenPort");
428         if (portString != null) {
429             try {
430                 openFlowPort = Short.decode(portString).shortValue();
431             } catch (NumberFormatException e) {
432                 logger.warn("Invalid port:{}, use default({})", portString,
433                         openFlowPort);
434             }
435         }
436         return openFlowPort;
437     }
438
439     @Override
440     public Boolean setOFController(Node node, String bridgeUUID) throws InterruptedException, ExecutionException {
441         Connection connection = this.getConnection(node);
442         if (connection == null) {
443             return false;
444         }
445
446         if (connection != null) {
447             List<InetAddress> ofControllerAddrs = this.getControllerIPAddresses();
448             short ofControllerPort = getControllerOFPort();
449             for (InetAddress ofControllerAddress : ofControllerAddrs) {
450                 String newController = "tcp:"+ofControllerAddress.getHostAddress()+":"+ofControllerPort;
451                 Controller controllerRow = new Controller();
452                 controllerRow.setTarget(newController);
453                 OVSDBConfigService ovsdbTable = (OVSDBConfigService)ServiceHelper.getGlobalInstance(OVSDBConfigService.class, this);
454                 if (ovsdbTable != null) {
455                     ovsdbTable.insertRow(node, Controller.NAME.getName(), bridgeUUID, controllerRow);
456                 }
457             }
458         }
459         return true;
460     }
461
462     private void updateOFControllers (Node node) {
463         Map<String, Table<?>> bridges = inventoryServiceInternal.getTableCache(node, Bridge.NAME.getName());
464         for (String bridgeUUID : bridges.keySet()) {
465             try {
466                 this.setOFController(node, bridgeUUID);
467             } catch (Exception e) {
468                 e.printStackTrace();
469             }
470         }
471     }
472
473     @Override
474     public void update(Node node, UpdateNotification updateNotification) {
475         if (updateNotification == null) return;
476         inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());
477     }
478
479     @Override
480     public void locked(Node node, List<String> ids) {
481         // TODO Auto-generated method stub
482     }
483
484     @Override
485     public void stolen(Node node, List<String> ids) {
486         // TODO Auto-generated method stub
487     }
488
489 }