2 * Copyright (C) 2013 Red Hat, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller
10 package org.opendaylight.ovsdb.plugin;
12 import io.netty.bootstrap.Bootstrap;
13 import io.netty.bootstrap.ServerBootstrap;
14 import io.netty.channel.AdaptiveRecvByteBufAllocator;
15 import io.netty.channel.Channel;
16 import io.netty.channel.ChannelFuture;
17 import io.netty.channel.ChannelHandler;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.channel.socket.SocketChannel;
23 import io.netty.channel.socket.nio.NioServerSocketChannel;
24 import io.netty.channel.socket.nio.NioSocketChannel;
25 import io.netty.handler.codec.string.StringEncoder;
26 import io.netty.handler.logging.LogLevel;
27 import io.netty.handler.logging.LoggingHandler;
28 import io.netty.util.CharsetUtil;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.UnknownHostException;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.HashSet;
36 import java.util.List;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentMap;
41 import java.util.concurrent.ExecutionException;
43 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
44 import org.opendaylight.controller.sal.connection.ConnectionConstants;
45 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
46 import org.opendaylight.controller.sal.core.Node;
47 import org.opendaylight.controller.sal.core.Property;
48 import org.opendaylight.controller.sal.utils.ServiceHelper;
49 import org.opendaylight.controller.sal.utils.Status;
50 import org.opendaylight.controller.sal.utils.StatusCode;
51 import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
52 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
53 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
54 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
55 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
56 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
57 import org.opendaylight.ovsdb.lib.message.temp.TableUpdates;
58 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
59 import org.opendaylight.ovsdb.lib.notation.OvsDBSet;
60 import org.opendaylight.ovsdb.lib.table.Bridge;
61 import org.opendaylight.ovsdb.lib.table.Controller;
62 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
63 import org.opendaylight.ovsdb.lib.table.Table;
64 import org.opendaylight.ovsdb.lib.table.Tables;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
68 import com.fasterxml.jackson.annotation.JsonInclude.Include;
69 import com.fasterxml.jackson.databind.DeserializationFeature;
70 import com.fasterxml.jackson.databind.ObjectMapper;
71 import com.google.common.util.concurrent.ListenableFuture;
75 * Represents the openflow plugin component in charge of programming the flows
76 * the flow programming and relay them to functional modules above SAL.
78 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
79 protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
81 // Properties that can be set in config.ini
82 private static final String OVSDB_LISTENPORT = "ovsdb.listenPort";
83 private static final String OVSDB_AUTOCONFIGURECONTROLLER = "ovsdb.autoconfigurecontroller";
84 protected static final String OPENFLOW_10 = "1.0";
85 protected static final String OPENFLOW_13 = "1.3";
87 private static final Integer defaultOvsdbPort = 6640;
88 private static final boolean defaultAutoConfigureController = true;
90 private static Integer ovsdbListenPort = defaultOvsdbPort;
91 private static boolean autoConfigureController = defaultAutoConfigureController;
92 private ConcurrentMap<String, Connection> ovsdbConnections;
93 private List<ChannelHandler> handlers = null;
94 private InventoryServiceInternal inventoryServiceInternal;
95 private Channel serverListenChannel = null;
97 public InventoryServiceInternal getInventoryServiceInternal() {
98 return inventoryServiceInternal;
101 public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
102 this.inventoryServiceInternal = inventoryServiceInternal;
105 public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
106 if (this.inventoryServiceInternal == inventoryServiceInternal) {
107 this.inventoryServiceInternal = null;
112 ovsdbConnections = new ConcurrentHashMap<String, Connection>();
113 int listenPort = defaultOvsdbPort;
114 String portString = System.getProperty(OVSDB_LISTENPORT);
115 if (portString != null) {
116 listenPort = Integer.decode(portString).intValue();
118 ovsdbListenPort = listenPort;
120 // Keep the default value if the property is not set
121 if (System.getProperty(OVSDB_AUTOCONFIGURECONTROLLER) != null)
122 autoConfigureController = Boolean.getBoolean(OVSDB_AUTOCONFIGURECONTROLLER);
126 * Function called by the dependency manager when at least one dependency
127 * become unsatisfied or when the component is shutting down because for
128 * example bundle is being stopped.
134 * Function called by dependency manager after "init ()" is called and after
135 * the services provided by the class are registered in the service registry
142 * Function called by the dependency manager before the services exported by
143 * the component are unregistered, this will be followed by a "destroy ()"
147 for (Connection connection : ovsdbConnections.values()) {
148 connection.disconnect();
150 serverListenChannel.disconnect();
154 public Status disconnect(Node node) {
155 String identifier = (String) node.getID();
156 Connection connection = ovsdbConnections.get(identifier);
157 if (connection != null) {
158 ovsdbConnections.remove(identifier);
159 return connection.disconnect();
161 return new Status(StatusCode.NOTFOUND);
166 public Node connect(String identifier, Map<ConnectionConstants, String> params) {
171 address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
172 } catch (Exception e) {
173 logger.error("Unable to resolve " + params.get(ConnectionConstants.ADDRESS), e);
178 port = Integer.parseInt(params.get(ConnectionConstants.PORT));
179 if (port == 0) port = defaultOvsdbPort;
180 } catch (Exception e) {
181 port = defaultOvsdbPort;
185 Bootstrap bootstrap = new Bootstrap();
186 bootstrap.group(new NioEventLoopGroup());
187 bootstrap.channel(NioSocketChannel.class);
188 bootstrap.option(ChannelOption.TCP_NODELAY, true);
189 bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
191 bootstrap.handler(new ChannelInitializer<SocketChannel>() {
193 public void initChannel(SocketChannel channel) throws Exception {
194 if (handlers == null) {
195 channel.pipeline().addLast(
196 //new LoggingHandler(LogLevel.INFO),
197 new JsonRpcDecoder(100000),
198 new StringEncoder(CharsetUtil.UTF_8));
200 for (ChannelHandler handler : handlers) {
201 channel.pipeline().addLast(handler);
207 ChannelFuture future = bootstrap.connect(address, port).sync();
208 Channel channel = future.channel();
209 return handleNewConnection(identifier, channel, this);
210 } catch (InterruptedException e) {
211 logger.error("Thread was interrupted during connect", e);
212 } catch (ExecutionException e) {
213 logger.error("ExecutionException in handleNewConnection for identifier " + identifier, e);
218 public List<ChannelHandler> getHandlers() {
222 public void setHandlers(List<ChannelHandler> handlers) {
223 this.handlers = handlers;
227 public Connection getConnection(Node node) {
228 String identifier = (String) node.getID();
229 return ovsdbConnections.get(identifier);
233 public List<Node> getNodes() {
234 List<Node> nodes = new ArrayList<Node>();
235 for (Connection connection : ovsdbConnections.values()) {
236 nodes.add(connection.getNode());
242 public void notifyClusterViewChanged() {
246 public void notifyNodeDisconnectFromMaster(Node arg0) {
249 private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
250 Connection connection = new Connection(identifier, channel);
251 Node node = connection.getNode();
253 ObjectMapper objectMapper = new ObjectMapper();
254 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
255 objectMapper.setSerializationInclusion(Include.NON_NULL);
257 JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
258 JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
259 binderHandler.setContext(node);
260 channel.pipeline().addLast(binderHandler);
262 OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
263 connection.setRpc(ovsdb);
264 ovsdb.registerCallback(instance);
265 ovsdbConnections.put(identifier, connection);
267 ChannelConnectionHandler handler = new ChannelConnectionHandler();
268 handler.setNode(node);
269 handler.setConnectionService(this);
270 ChannelFuture closeFuture = channel.closeFuture();
271 closeFuture.addListener(handler);
272 // Keeping the Initial inventory update(s) on its own thread.
274 Connection connection;
280 initializeInventoryForNewNode(connection);
281 } catch (InterruptedException | ExecutionException e) {
282 logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
283 ovsdbConnections.remove(identifier);
286 public Thread initializeConnectionParams(String identifier, Connection connection) {
287 this.identifier = identifier;
288 this.connection = connection;
291 }.initializeConnectionParams(identifier, connection).start();
295 public void channelClosed(Node node) throws Exception {
296 logger.info("Connection to Node : {} closed", node);
298 inventoryServiceInternal.removeNode(node);
301 private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
302 Channel channel = connection.getChannel();
303 InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
304 int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
305 IPAddressProperty addressProp = new IPAddressProperty(address);
306 L4PortProperty l4Port = new L4PortProperty(port);
307 Set<Property> props = new HashSet<Property>();
308 props.add(addressProp);
310 inventoryServiceInternal.addNode(connection.getNode(), props);
312 List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
313 ListenableFuture<DatabaseSchema> dbSchemaF = null;//todo : fix it up to new structue : connection.getRpc().get_schema(dbNames);
314 DatabaseSchema databaseSchema = dbSchemaF.get();
315 inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
317 MonitorRequestBuilder monitorReq = null; //ashwin(not sure if we need) : new MonitorRequestBuilder();
318 for (Table<?> table : Tables.getTables()) {
319 if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
320 //ashwin(not sure if we need) monitorReq.monitor(table);
322 logger.debug("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
326 ListenableFuture<TableUpdates> monResponse = null; //ashwin(not sure if we need)connection.getRpc().monitor(monitorReq);
327 TableUpdates updates = monResponse.get();
328 if (updates.getError() != null) {
329 logger.error("Error configuring monitor, error : {}, details : {}",
331 updates.getDetails());
332 /* FIXME: This should be cause for alarm */
333 throw new RuntimeException("Failed to setup a monitor in OVSDB");
335 UpdateNotification monitor = new UpdateNotification();
336 monitor.setUpdate(updates);
337 this.update(connection.getNode(), monitor);
338 if (autoConfigureController) {
339 this.updateOFControllers(connection.getNode());
341 inventoryServiceInternal.notifyNodeAdded(connection.getNode());
344 private void startOvsdbManager() {
353 private void ovsdbManager() {
354 EventLoopGroup bossGroup = new NioEventLoopGroup();
355 EventLoopGroup workerGroup = new NioEventLoopGroup();
357 ServerBootstrap b = new ServerBootstrap();
358 b.group(bossGroup, workerGroup)
359 .channel(NioServerSocketChannel.class)
360 .option(ChannelOption.SO_BACKLOG, 100)
361 .handler(new LoggingHandler(LogLevel.INFO))
362 .childHandler(new ChannelInitializer<SocketChannel>() {
364 public void initChannel(SocketChannel channel) throws Exception {
365 logger.debug("New Passive channel created : "+ channel.toString());
366 InetAddress address = channel.remoteAddress().getAddress();
367 int port = channel.remoteAddress().getPort();
368 String identifier = address.getHostAddress()+":"+port;
369 channel.pipeline().addLast(
370 new LoggingHandler(LogLevel.INFO),
371 new JsonRpcDecoder(100000),
372 new StringEncoder(CharsetUtil.UTF_8));
374 Node node = handleNewConnection(identifier, channel, ConnectionService.this);
375 logger.debug("Connected Node : "+node.toString());
378 b.option(ChannelOption.TCP_NODELAY, true);
379 b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
381 ChannelFuture f = b.bind(ovsdbListenPort).sync();
382 serverListenChannel = f.channel();
383 // Wait until the server socket is closed.
384 serverListenChannel.closeFuture().sync();
385 } catch (InterruptedException e) {
386 logger.error("Thread interrupted", e);
388 // Shut down all event loops to terminate all threads.
389 bossGroup.shutdownGracefully();
390 workerGroup.shutdownGracefully();
394 private IClusterGlobalServices clusterServices;
396 public void setClusterServices(IClusterGlobalServices i) {
397 this.clusterServices = i;
400 public void unsetClusterServices(IClusterGlobalServices i) {
401 if (this.clusterServices == i) {
402 this.clusterServices = null;
406 private List<InetAddress> getControllerIPAddresses(Connection connection) {
407 List<InetAddress> controllers = null;
408 InetAddress controllerIP = null;
410 controllers = new ArrayList<InetAddress>();
411 String addressString = System.getProperty("ovsdb.controller.address");
413 if (addressString != null) {
415 controllerIP = InetAddress.getByName(addressString);
416 if (controllerIP != null) {
417 controllers.add(controllerIP);
420 } catch (UnknownHostException e) {
421 logger.error("Host {} is invalid", addressString);
425 if (clusterServices != null) {
426 controllers = clusterServices.getClusteredControllers();
427 if (controllers != null && controllers.size() > 0) {
428 if (controllers.size() == 1) {
429 InetAddress controller = controllers.get(0);
430 if (!controller.equals(InetAddress.getLoopbackAddress())) {
439 addressString = System.getProperty("of.address");
441 if (addressString != null) {
443 controllerIP = InetAddress.getByName(addressString);
444 if (controllerIP != null) {
445 controllers.add(controllerIP);
448 } catch (UnknownHostException e) {
449 logger.error("Host {} is invalid", addressString);
454 controllerIP = ((InetSocketAddress)connection.getChannel().localAddress()).getAddress();
455 controllers.add(controllerIP);
457 } catch (Exception e) {
458 logger.debug("Invalid connection provided to getControllerIPAddresses", e);
463 private short getControllerOFPort() {
464 Short defaultOpenFlowPort = 6633;
465 Short openFlowPort = defaultOpenFlowPort;
466 String portString = System.getProperty("of.listenPort");
467 if (portString != null) {
469 openFlowPort = Short.decode(portString).shortValue();
470 } catch (NumberFormatException e) {
471 logger.warn("Invalid port:{}, use default({})", portString,
479 public Boolean setOFController(Node node, String bridgeUUID) throws InterruptedException, ExecutionException {
480 Connection connection = this.getConnection(node);
481 if (connection == null) {
485 OVSDBConfigService ovsdbTable = (OVSDBConfigService)ServiceHelper.getGlobalInstance(OVSDBConfigService.class, this);
486 OvsDBSet<String> protocols = new OvsDBSet<String>();
488 String ofVersion = System.getProperty("ovsdb.of.version", OPENFLOW_10);
491 protocols.add("OpenFlow13");
495 protocols.add("OpenFlow10");
499 Bridge bridge = new Bridge();
500 bridge.setProtocols(protocols);
501 Status status = ovsdbTable.updateRow(node, Bridge.NAME.getName(), null, bridgeUUID, bridge);
502 logger.debug("Bridge {} updated to {} with Status {}", bridgeUUID, protocols.toArray()[0], status);
504 List<InetAddress> ofControllerAddrs = this.getControllerIPAddresses(connection);
505 short ofControllerPort = getControllerOFPort();
506 for (InetAddress ofControllerAddress : ofControllerAddrs) {
507 String newController = "tcp:"+ofControllerAddress.getHostAddress()+":"+ofControllerPort;
508 Controller controllerRow = new Controller();
509 controllerRow.setTarget(newController);
510 if (ovsdbTable != null) {
511 ovsdbTable.insertRow(node, Controller.NAME.getName(), bridgeUUID, controllerRow);
517 private void updateOFControllers (Node node) {
518 Map<String, Table<?>> bridges = inventoryServiceInternal.getTableCache(node, Bridge.NAME.getName());
519 if (bridges == null) return;
520 for (String bridgeUUID : bridges.keySet()) {
522 this.setOFController(node, bridgeUUID);
523 } catch (Exception e) {
524 logger.error("Failed updateOFControllers", e);
530 public void update(Object context, UpdateNotification updateNotification) {
531 if (updateNotification == null) return;
532 inventoryServiceInternal.processTableUpdates((Node)context, updateNotification.getUpdate());
536 public void locked(Object context, List<String> ids) {
537 // TODO Auto-generated method stub
541 public void stolen(Object context, List<String> ids) {
542 // TODO Auto-generated method stub