1 package org.opendaylight.ovsdb.plugin;
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.bootstrap.ServerBootstrap;
5 import io.netty.channel.AdaptiveRecvByteBufAllocator;
6 import io.netty.channel.Channel;
7 import io.netty.channel.ChannelFuture;
8 import io.netty.channel.ChannelHandler;
9 import io.netty.channel.ChannelInitializer;
10 import io.netty.channel.ChannelOption;
11 import io.netty.channel.EventLoopGroup;
12 import io.netty.channel.nio.NioEventLoopGroup;
13 import io.netty.channel.socket.SocketChannel;
14 import io.netty.channel.socket.nio.NioServerSocketChannel;
15 import io.netty.channel.socket.nio.NioSocketChannel;
16 import io.netty.handler.codec.string.StringEncoder;
17 import io.netty.handler.logging.LogLevel;
18 import io.netty.handler.logging.LoggingHandler;
19 import io.netty.util.CharsetUtil;
21 import java.net.InetAddress;
22 import java.net.InetSocketAddress;
23 import java.net.NetworkInterface;
24 import java.net.SocketException;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.Enumeration;
29 import java.util.HashSet;
30 import java.util.List;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ExecutionException;
37 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
38 import org.opendaylight.controller.sal.connection.ConnectionConstants;
39 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.utils.NetUtils;
43 import org.opendaylight.controller.sal.utils.ServiceHelper;
44 import org.opendaylight.controller.sal.utils.Status;
45 import org.opendaylight.controller.sal.utils.StatusCode;
46 import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
47 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
48 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
49 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
50 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
51 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
52 import org.opendaylight.ovsdb.lib.message.TableUpdates;
53 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
54 import org.opendaylight.ovsdb.lib.table.Bridge;
55 import org.opendaylight.ovsdb.lib.table.Controller;
56 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
57 import org.opendaylight.ovsdb.lib.table.internal.Table;
58 import org.opendaylight.ovsdb.lib.table.internal.Tables;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 import com.fasterxml.jackson.annotation.JsonInclude.Include;
63 import com.fasterxml.jackson.databind.DeserializationFeature;
64 import com.fasterxml.jackson.databind.ObjectMapper;
65 import com.google.common.util.concurrent.ListenableFuture;
69 * Represents the openflow plugin component in charge of programming the flows
70 * the flow programming and relay them to functional modules above SAL.
72 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
73 protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
75 private static final Integer defaultOvsdbPort = 6640;
76 private static Integer ovsdbListenPort = defaultOvsdbPort;
77 private ConcurrentMap<String, Connection> ovsdbConnections;
78 private List<ChannelHandler> handlers = null;
79 private InventoryServiceInternal inventoryServiceInternal;
80 private Channel serverListenChannel = null;
82 public InventoryServiceInternal getInventoryServiceInternal() {
83 return inventoryServiceInternal;
86 public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
87 this.inventoryServiceInternal = inventoryServiceInternal;
90 public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
91 if (this.inventoryServiceInternal == inventoryServiceInternal) {
92 this.inventoryServiceInternal = null;
97 ovsdbConnections = new ConcurrentHashMap<String, Connection>();
98 int listenPort = defaultOvsdbPort;
99 String portString = System.getProperty("ovsdb.listenPort");
100 if (portString != null) {
101 listenPort = Integer.decode(portString).intValue();
103 ovsdbListenPort = listenPort;
107 * Function called by the dependency manager when at least one dependency
108 * become unsatisfied or when the component is shutting down because for
109 * example bundle is being stopped.
115 * Function called by dependency manager after "init ()" is called and after
116 * the services provided by the class are registered in the service registry
123 * Function called by the dependency manager before the services exported by
124 * the component are unregistered, this will be followed by a "destroy ()"
128 for (Connection connection : ovsdbConnections.values()) {
129 connection.disconnect();
131 serverListenChannel.disconnect();
135 public Status disconnect(Node node) {
136 String identifier = (String) node.getID();
137 Connection connection = ovsdbConnections.get(identifier);
138 if (connection != null) {
139 ovsdbConnections.remove(identifier);
140 return connection.disconnect();
142 return new Status(StatusCode.NOTFOUND);
147 public Node connect(String identifier, Map<ConnectionConstants, String> params) {
152 address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
153 } catch (Exception e) {
159 port = Integer.parseInt(params.get(ConnectionConstants.PORT));
160 if (port == 0) port = defaultOvsdbPort;
161 } catch (Exception e) {
162 port = defaultOvsdbPort;
166 Bootstrap bootstrap = new Bootstrap();
167 bootstrap.group(new NioEventLoopGroup());
168 bootstrap.channel(NioSocketChannel.class);
169 bootstrap.option(ChannelOption.TCP_NODELAY, true);
170 bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
172 bootstrap.handler(new ChannelInitializer<SocketChannel>() {
174 public void initChannel(SocketChannel channel) throws Exception {
175 if (handlers == null) {
176 channel.pipeline().addLast(
177 //new LoggingHandler(LogLevel.INFO),
178 new JsonRpcDecoder(100000),
179 new StringEncoder(CharsetUtil.UTF_8));
181 for (ChannelHandler handler : handlers) {
182 channel.pipeline().addLast(handler);
188 ChannelFuture future = bootstrap.connect(address, port).sync();
189 Channel channel = future.channel();
190 return handleNewConnection(identifier, channel, this);
191 } catch (Exception e) {
197 public List<ChannelHandler> getHandlers() {
201 public void setHandlers(List<ChannelHandler> handlers) {
202 this.handlers = handlers;
206 public Connection getConnection(Node node) {
207 String identifier = (String) node.getID();
208 return ovsdbConnections.get(identifier);
212 public void notifyClusterViewChanged() {
216 public void notifyNodeDisconnectFromMaster(Node arg0) {
219 private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
220 Connection connection = new Connection(identifier, channel);
221 Node node = connection.getNode();
223 ObjectMapper objectMapper = new ObjectMapper();
224 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
225 objectMapper.setSerializationInclusion(Include.NON_NULL);
227 JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
228 JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
229 binderHandler.setNode(node);
230 channel.pipeline().addLast(binderHandler);
232 OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
233 connection.setRpc(ovsdb);
234 ovsdb.registerCallback(instance);
235 ovsdbConnections.put(identifier, connection);
237 ChannelConnectionHandler handler = new ChannelConnectionHandler();
238 handler.setNode(node);
239 handler.setConnectionService(this);
240 ChannelFuture closeFuture = channel.closeFuture();
241 closeFuture.addListener(handler);
242 // Keeping the Initial inventory update(s) on its own thread.
244 Connection connection;
250 initializeInventoryForNewNode(connection);
251 } catch (Exception e) {
253 ovsdbConnections.remove(identifier);
256 public Thread initializeConnectionParams(String identifier, Connection connection) {
257 this.identifier = identifier;
258 this.connection = connection;
261 }.initializeConnectionParams(identifier, connection).start();
265 public void channelClosed(Node node) throws Exception {
266 logger.error("Connection to Node : {} closed", node);
267 inventoryServiceInternal.removeNode(node);
270 private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
271 Channel channel = connection.getChannel();
272 InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
273 int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
274 IPAddressProperty addressProp = new IPAddressProperty(address);
275 L4PortProperty l4Port = new L4PortProperty(port);
276 Set<Property> props = new HashSet<Property>();
277 props.add(addressProp);
279 inventoryServiceInternal.addNode(connection.getNode(), props);
281 List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
282 ListenableFuture<DatabaseSchema> dbSchemaF = connection.getRpc().get_schema(dbNames);
283 DatabaseSchema databaseSchema = dbSchemaF.get();
284 inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
286 MonitorRequestBuilder monitorReq = new MonitorRequestBuilder();
287 for (Table<?> table : Tables.getTables()) {
288 if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
289 monitorReq.monitor(table);
291 logger.warn("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
295 ListenableFuture<TableUpdates> monResponse = connection.getRpc().monitor(monitorReq);
296 TableUpdates updates = monResponse.get();
297 if (updates.getError() != null) {
298 logger.error("Error configuring monitor, error : {}, details : {}",
300 updates.getDetails());
301 /* FIXME: This should be cause for alarm */
302 throw new RuntimeException("Failed to setup a monitor in OVSDB");
304 UpdateNotification monitor = new UpdateNotification();
305 monitor.setUpdate(updates);
306 this.update(connection.getNode(), monitor);
307 // With the existing bridges learnt, now it is time to update the OF Controller connections.
308 this.updateOFControllers(connection.getNode());
311 private void startOvsdbManager() {
320 private void ovsdbManager() {
321 EventLoopGroup bossGroup = new NioEventLoopGroup();
322 EventLoopGroup workerGroup = new NioEventLoopGroup();
324 ServerBootstrap b = new ServerBootstrap();
325 b.group(bossGroup, workerGroup)
326 .channel(NioServerSocketChannel.class)
327 .option(ChannelOption.SO_BACKLOG, 100)
328 .handler(new LoggingHandler(LogLevel.INFO))
329 .childHandler(new ChannelInitializer<SocketChannel>() {
331 public void initChannel(SocketChannel channel) throws Exception {
332 logger.debug("New Passive channel created : "+ channel.toString());
333 InetAddress address = channel.remoteAddress().getAddress();
334 int port = channel.remoteAddress().getPort();
335 String identifier = address.getHostAddress()+":"+port;
336 channel.pipeline().addLast(
337 new LoggingHandler(LogLevel.INFO),
338 new JsonRpcDecoder(100000),
339 new StringEncoder(CharsetUtil.UTF_8));
341 Node node = handleNewConnection(identifier, channel, ConnectionService.this);
342 logger.debug("Connected Node : "+node.toString());
345 b.option(ChannelOption.TCP_NODELAY, true);
346 b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
348 ChannelFuture f = b.bind(ovsdbListenPort).sync();
349 serverListenChannel = f.channel();
350 // Wait until the server socket is closed.
351 serverListenChannel.closeFuture().sync();
352 } catch (InterruptedException e) {
355 // Shut down all event loops to terminate all threads.
356 bossGroup.shutdownGracefully();
357 workerGroup.shutdownGracefully();
361 private IClusterGlobalServices clusterServices;
363 public void setClusterServices(IClusterGlobalServices i) {
364 this.clusterServices = i;
367 public void unsetClusterServices(IClusterGlobalServices i) {
368 if (this.clusterServices == i) {
369 this.clusterServices = null;
373 private List<InetAddress> getControllerIPAddresses() {
374 List<InetAddress> controllers = null;
375 if (clusterServices != null) {
376 controllers = clusterServices.getClusteredControllers();
377 if (controllers != null && controllers.size() > 0) {
378 if (controllers.size() == 1) {
379 InetAddress controller = controllers.get(0);
380 if (!controller.equals(InetAddress.getLoopbackAddress())) {
389 controllers = new ArrayList<InetAddress>();
390 String addressString = System.getProperty("ovsdb.controller.address");
391 if (addressString == null) addressString = System.getProperty("of.address");
393 if (addressString != null) {
394 InetAddress controllerIP = null;
396 controllerIP = InetAddress.getByName(addressString);
397 if (controllerIP != null) {
398 controllers.add(controllerIP);
401 } catch (Exception e) {
402 logger.debug("Invalid IP: {}, use wildcard *", addressString);
406 Enumeration<NetworkInterface> nets;
408 nets = NetworkInterface.getNetworkInterfaces();
409 for (NetworkInterface netint : Collections.list(nets)) {
410 Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
411 for (InetAddress inetAddress : Collections.list(inetAddresses)) {
412 if (!inetAddress.isLoopbackAddress() &&
413 NetUtils.isIPv4AddressValid(inetAddress.getHostAddress())) {
414 controllers.add(inetAddress);
418 } catch (SocketException e) {
419 controllers.add(InetAddress.getLoopbackAddress());
424 private short getControllerOFPort() {
425 Short defaultOpenFlowPort = 6633;
426 Short openFlowPort = defaultOpenFlowPort;
427 String portString = System.getProperty("of.listenPort");
428 if (portString != null) {
430 openFlowPort = Short.decode(portString).shortValue();
431 } catch (NumberFormatException e) {
432 logger.warn("Invalid port:{}, use default({})", portString,
440 public Boolean setOFController(Node node, String bridgeUUID) throws InterruptedException, ExecutionException {
441 Connection connection = this.getConnection(node);
442 if (connection == null) {
446 if (connection != null) {
447 List<InetAddress> ofControllerAddrs = this.getControllerIPAddresses();
448 short ofControllerPort = getControllerOFPort();
449 for (InetAddress ofControllerAddress : ofControllerAddrs) {
450 String newController = "tcp:"+ofControllerAddress.getHostAddress()+":"+ofControllerPort;
451 Controller controllerRow = new Controller();
452 controllerRow.setTarget(newController);
453 OVSDBConfigService ovsdbTable = (OVSDBConfigService)ServiceHelper.getGlobalInstance(OVSDBConfigService.class, this);
454 if (ovsdbTable != null) {
455 ovsdbTable.insertRow(node, Controller.NAME.getName(), bridgeUUID, controllerRow);
462 private void updateOFControllers (Node node) {
463 Map<String, Table<?>> bridges = inventoryServiceInternal.getTableCache(node, Bridge.NAME.getName());
464 for (String bridgeUUID : bridges.keySet()) {
466 this.setOFController(node, bridgeUUID);
467 } catch (Exception e) {
474 public void update(Node node, UpdateNotification updateNotification) {
475 if (updateNotification == null) return;
476 inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());
480 public void locked(Node node, List<String> ids) {
481 // TODO Auto-generated method stub
485 public void stolen(Node node, List<String> ids) {
486 // TODO Auto-generated method stub