1 package org.opendaylight.ovsdb.plugin;
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.bootstrap.ServerBootstrap;
5 import io.netty.channel.AdaptiveRecvByteBufAllocator;
6 import io.netty.channel.Channel;
7 import io.netty.channel.ChannelFuture;
8 import io.netty.channel.ChannelHandler;
9 import io.netty.channel.ChannelInitializer;
10 import io.netty.channel.ChannelOption;
11 import io.netty.channel.EventLoopGroup;
12 import io.netty.channel.nio.NioEventLoopGroup;
13 import io.netty.channel.socket.SocketChannel;
14 import io.netty.channel.socket.nio.NioServerSocketChannel;
15 import io.netty.channel.socket.nio.NioSocketChannel;
16 import io.netty.handler.codec.string.StringEncoder;
17 import io.netty.handler.logging.LogLevel;
18 import io.netty.handler.logging.LoggingHandler;
19 import io.netty.util.CharsetUtil;
21 import java.net.InetAddress;
22 import java.net.InetSocketAddress;
23 import java.util.Arrays;
24 import java.util.HashSet;
25 import java.util.List;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.ExecutionException;
32 import org.opendaylight.controller.sal.connection.ConnectionConstants;
33 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
34 import org.opendaylight.controller.sal.core.Node;
35 import org.opendaylight.controller.sal.core.Property;
36 import org.opendaylight.controller.sal.utils.Status;
37 import org.opendaylight.controller.sal.utils.StatusCode;
38 import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
39 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
40 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
41 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
42 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
43 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
44 import org.opendaylight.ovsdb.lib.message.TableUpdates;
45 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
46 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
47 import org.opendaylight.ovsdb.lib.table.internal.Table;
48 import org.opendaylight.ovsdb.lib.table.internal.Tables;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 import com.fasterxml.jackson.annotation.JsonInclude.Include;
53 import com.fasterxml.jackson.databind.DeserializationFeature;
54 import com.fasterxml.jackson.databind.ObjectMapper;
55 import com.google.common.util.concurrent.ListenableFuture;
59 * Represents the openflow plugin component in charge of programming the flows
60 * the flow programming and relay them to functional modules above SAL.
62 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
63 protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
65 private static final Integer defaultOvsdbPort = 6640;
66 private static Integer ovsdbListenPort = defaultOvsdbPort;
67 private ConcurrentMap<String, Connection> ovsdbConnections;
68 private List<ChannelHandler> handlers = null;
69 private InventoryServiceInternal inventoryServiceInternal;
70 private Channel serverListenChannel = null;
72 public InventoryServiceInternal getInventoryServiceInternal() {
73 return inventoryServiceInternal;
76 public void setInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
77 this.inventoryServiceInternal = inventoryServiceInternal;
80 public void unsetInventoryServiceInternal(InventoryServiceInternal inventoryServiceInternal) {
81 if (this.inventoryServiceInternal == inventoryServiceInternal) {
82 this.inventoryServiceInternal = null;
87 ovsdbConnections = new ConcurrentHashMap<String, Connection>();
88 int listenPort = defaultOvsdbPort;
89 String portString = System.getProperty("ovsdb.listenPort");
90 if (portString != null) {
91 listenPort = Integer.decode(portString).intValue();
93 ovsdbListenPort = listenPort;
97 * Function called by the dependency manager when at least one dependency
98 * become unsatisfied or when the component is shutting down because for
99 * example bundle is being stopped.
105 * Function called by dependency manager after "init ()" is called and after
106 * the services provided by the class are registered in the service registry
113 * Function called by the dependency manager before the services exported by
114 * the component are unregistered, this will be followed by a "destroy ()"
118 for (Connection connection : ovsdbConnections.values()) {
119 connection.disconnect();
121 serverListenChannel.disconnect();
125 public Status disconnect(Node node) {
126 String identifier = (String) node.getID();
127 Connection connection = ovsdbConnections.get(identifier);
128 if (connection != null) {
129 ovsdbConnections.remove(identifier);
130 return connection.disconnect();
132 return new Status(StatusCode.NOTFOUND);
137 public Node connect(String identifier, Map<ConnectionConstants, String> params) {
142 address = InetAddress.getByName(params.get(ConnectionConstants.ADDRESS));
143 } catch (Exception e) {
149 port = Integer.parseInt(params.get(ConnectionConstants.PORT));
150 if (port == 0) port = defaultOvsdbPort;
151 } catch (Exception e) {
152 port = defaultOvsdbPort;
156 Bootstrap bootstrap = new Bootstrap();
157 bootstrap.group(new NioEventLoopGroup());
158 bootstrap.channel(NioSocketChannel.class);
159 bootstrap.option(ChannelOption.TCP_NODELAY, true);
160 bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
162 bootstrap.handler(new ChannelInitializer<SocketChannel>() {
164 public void initChannel(SocketChannel channel) throws Exception {
165 if (handlers == null) {
166 channel.pipeline().addLast(
167 //new LoggingHandler(LogLevel.INFO),
168 new JsonRpcDecoder(100000),
169 new StringEncoder(CharsetUtil.UTF_8));
171 for (ChannelHandler handler : handlers) {
172 channel.pipeline().addLast(handler);
178 ChannelFuture future = bootstrap.connect(address, port).sync();
179 Channel channel = future.channel();
180 return handleNewConnection(identifier, channel, this);
181 } catch (Exception e) {
187 public List<ChannelHandler> getHandlers() {
191 public void setHandlers(List<ChannelHandler> handlers) {
192 this.handlers = handlers;
196 public Connection getConnection(Node node) {
197 String identifier = (String) node.getID();
198 return ovsdbConnections.get(identifier);
202 public void notifyClusterViewChanged() {
206 public void notifyNodeDisconnectFromMaster(Node arg0) {
209 private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
210 Connection connection = new Connection(identifier, channel);
211 Node node = connection.getNode();
213 ObjectMapper objectMapper = new ObjectMapper();
214 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
215 objectMapper.setSerializationInclusion(Include.NON_NULL);
217 JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
218 JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
219 binderHandler.setNode(node);
220 channel.pipeline().addLast(binderHandler);
222 OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
223 connection.setRpc(ovsdb);
224 ovsdb.registerCallback(instance);
225 ovsdbConnections.put(identifier, connection);
227 ChannelConnectionHandler handler = new ChannelConnectionHandler();
228 handler.setNode(node);
229 handler.setConnectionService(this);
230 ChannelFuture closeFuture = channel.closeFuture();
231 closeFuture.addListener(handler);
232 // Keeping the Initial inventory update(s) on its own thread.
234 Connection connection;
240 initializeInventoryForNewNode(connection);
241 } catch (Exception e) {
243 ovsdbConnections.remove(identifier);
246 public Thread initializeConnectionParams(String identifier, Connection connection) {
247 this.identifier = identifier;
248 this.connection = connection;
251 }.initializeConnectionParams(identifier, connection).start();
255 public void channelClosed(Node node) throws Exception {
256 logger.error("Connection to Node : {} closed", node);
257 inventoryServiceInternal.removeNode(node);
260 private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
261 Channel channel = connection.getChannel();
262 InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
263 int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
264 IPAddressProperty addressProp = new IPAddressProperty(address);
265 L4PortProperty l4Port = new L4PortProperty(port);
266 Set<Property> props = new HashSet<Property>();
267 props.add(addressProp);
269 inventoryServiceInternal.addNode(connection.getNode(), props);
271 List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
272 ListenableFuture<DatabaseSchema> dbSchemaF = connection.getRpc().get_schema(dbNames);
273 DatabaseSchema databaseSchema = dbSchemaF.get();
274 inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
276 MonitorRequestBuilder monitorReq = new MonitorRequestBuilder();
277 for (Table<?> table : Tables.getTables()) {
278 if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
279 monitorReq.monitor(table);
281 logger.warn("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
285 ListenableFuture<TableUpdates> monResponse = connection.getRpc().monitor(monitorReq);
286 TableUpdates updates = monResponse.get();
287 if (updates.getError() != null) {
288 logger.error("Error configuring monitor, error : {}, details : {}",
290 updates.getDetails());
291 /* FIXME: This should be cause for alarm */
292 throw new RuntimeException("Failed to setup a monitor in OVSDB");
294 UpdateNotification monitor = new UpdateNotification();
295 monitor.setUpdate(updates);
296 this.update(connection.getNode(), monitor);
299 private void startOvsdbManager() {
308 private void ovsdbManager() {
309 EventLoopGroup bossGroup = new NioEventLoopGroup();
310 EventLoopGroup workerGroup = new NioEventLoopGroup();
312 ServerBootstrap b = new ServerBootstrap();
313 b.group(bossGroup, workerGroup)
314 .channel(NioServerSocketChannel.class)
315 .option(ChannelOption.SO_BACKLOG, 100)
316 .handler(new LoggingHandler(LogLevel.INFO))
317 .childHandler(new ChannelInitializer<SocketChannel>() {
319 public void initChannel(SocketChannel channel) throws Exception {
320 logger.debug("New Passive channel created : "+ channel.toString());
321 InetAddress address = channel.remoteAddress().getAddress();
322 int port = channel.remoteAddress().getPort();
323 String identifier = address.getHostAddress()+":"+port;
324 channel.pipeline().addLast(
325 new LoggingHandler(LogLevel.INFO),
326 new JsonRpcDecoder(100000),
327 new StringEncoder(CharsetUtil.UTF_8));
329 Node node = handleNewConnection(identifier, channel, ConnectionService.this);
330 logger.debug("Connected Node : "+node.toString());
333 b.option(ChannelOption.TCP_NODELAY, true);
334 b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
336 ChannelFuture f = b.bind(ovsdbListenPort).sync();
337 serverListenChannel = f.channel();
338 // Wait until the server socket is closed.
339 serverListenChannel.closeFuture().sync();
340 } catch (InterruptedException e) {
343 // Shut down all event loops to terminate all threads.
344 bossGroup.shutdownGracefully();
345 workerGroup.shutdownGracefully();
350 public void update(Node node, UpdateNotification updateNotification) {
351 if (updateNotification == null) return;
352 inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());
356 public void locked(Node node, List<String> ids) {
357 // TODO Auto-generated method stub
361 public void stolen(Node node, List<String> ids) {
362 // TODO Auto-generated method stub