From f3905fd885aee2d654bb95c31ab2a80eed0df2f4 Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Sun, 3 Nov 2013 05:53:39 -0500 Subject: [PATCH] OVSDB Manager Passive listen support This change brings in the following : 1. ovsdb manager passive listen on port 6640. With this change, the OVSDB-server can connect to the controller via set-manager configuration. 2. Default listening and connecting port is changed to 6640 (as per the IANA port assignment) 3. Reorganized ConnectionManager a bit to share the exact same Channel handling code for both Active and Passive ovsdb connections. Change-Id: I76836aa53b519af08aaf3399ba104351d94ddd4c Signed-off-by: Madhu Venugopal --- .../ovsdb/plugin/ConnectionService.java | 132 +++++++++++++++--- 1 file changed, 109 insertions(+), 23 deletions(-) diff --git a/ovsdb/src/main/java/org/opendaylight/ovsdb/plugin/ConnectionService.java b/ovsdb/src/main/java/org/opendaylight/ovsdb/plugin/ConnectionService.java index 7fc0342866..a8e7acc56f 100755 --- a/ovsdb/src/main/java/org/opendaylight/ovsdb/plugin/ConnectionService.java +++ b/ovsdb/src/main/java/org/opendaylight/ovsdb/plugin/ConnectionService.java @@ -1,9 +1,11 @@ package org.opendaylight.ovsdb.plugin; import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; @@ -35,6 +37,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ListenableFuture; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -50,10 +53,12 @@ import java.util.concurrent.ExecutionException; public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback { protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class); - private static final Integer defaultOvsdbPort = 6632; + private static final Integer defaultOvsdbPort = 6640; + private static Integer ovsdbListenPort = defaultOvsdbPort; private ConcurrentMap ovsdbConnections; private List handlers = null; private InventoryServiceInternal inventoryServiceInternal; + private Channel serverListenChannel = null; public InventoryServiceInternal getInventoryServiceInternal() { return inventoryServiceInternal; @@ -71,6 +76,12 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio public void init() { ovsdbConnections = new ConcurrentHashMap(); + int listenPort = defaultOvsdbPort; + String portString = System.getProperty("ovsdb.listenPort"); + if (portString != null) { + listenPort = Integer.decode(portString).intValue(); + } + ovsdbListenPort = listenPort; } /** @@ -86,6 +97,7 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio * the services provided by the class are registered in the service registry */ void start() { + startOvsdbManager(); } /** @@ -97,6 +109,7 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio for (Connection connection : ovsdbConnections.values()) { connection.disconnect(); } + serverListenChannel.disconnect(); } @Override @@ -154,34 +167,14 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio }); ChannelFuture future = bootstrap.connect(address, port).sync(); - Channel channel = future.channel(); - Connection connection = new Connection(identifier, channel); - Node node = connection.getNode(); - - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - objectMapper.setSerializationInclusion(Include.NON_NULL); - - JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel); - JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory); - binderHandler.setNode(node); - channel.pipeline().addLast(binderHandler); - - OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class); - connection.setRpc(ovsdb); - ovsdb.registerCallback(this); - - handleNewConnection(connection, address, port); - ovsdbConnections.put(identifier, connection); - return node; + return handleNewConnection(identifier, channel, this); } catch (Exception e) { e.printStackTrace(); } return null; } - public List getHandlers() { return handlers; } @@ -204,7 +197,51 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio public void notifyNodeDisconnectFromMaster(Node arg0) { } - private void handleNewConnection (Connection connection, InetAddress address, int port) throws InterruptedException, ExecutionException { + private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException { + Connection connection = new Connection(identifier, channel); + Node node = connection.getNode(); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.setSerializationInclusion(Include.NON_NULL); + + JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel); + JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory); + binderHandler.setNode(node); + channel.pipeline().addLast(binderHandler); + + OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class); + connection.setRpc(ovsdb); + ovsdb.registerCallback(instance); + + // Keeping the Initial inventory update(s) on its own thread. + new Thread() { + Connection connection; + String identifier; + + public void run() { + try { + initializeInventoryForNewNode(connection); + ovsdbConnections.put(identifier, connection); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + public Thread initializeConnectionParams(String identifier, Connection connection) { + this.identifier = identifier; + this.connection = connection; + return this; + } + }.initializeConnectionParams(identifier, connection).start(); + return node; + } + + private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException { + Channel channel = connection.getChannel(); + InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress(); + int port = ((InetSocketAddress)channel.remoteAddress()).getPort(); IPAddressProperty addressProp = new IPAddressProperty(address); L4PortProperty l4Port = new L4PortProperty(port); inventoryServiceInternal.addNodeProperty(connection.getNode(), addressProp); @@ -238,6 +275,55 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio this.update(connection.getNode(), monitor); } + private void startOvsdbManager() { + new Thread() { + public void run() { + ovsdbManager(); + } + }.start(); + } + + private void ovsdbManager() { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + logger.debug("New Passive channel created : "+ channel.toString()); + InetAddress address = channel.remoteAddress().getAddress(); + int port = channel.remoteAddress().getPort(); + String identifier = address.getHostAddress()+":"+port; + channel.pipeline().addLast( + new LoggingHandler(LogLevel.INFO), + new JsonRpcDecoder(100000), + new StringEncoder(CharsetUtil.UTF_8)); + + Node node = handleNewConnection(identifier, channel, ConnectionService.this); + logger.debug("Connected Node : "+node.toString()); + } + }); + b.option(ChannelOption.TCP_NODELAY, true); + b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535)); + // Start the server. + ChannelFuture f = b.bind(ovsdbListenPort).sync(); + serverListenChannel = f.channel(); + // Wait until the server socket is closed. + serverListenChannel.closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + // Shut down all event loops to terminate all threads. + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + @Override public void update(Node node, UpdateNotification updateNotification) { inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate()); -- 2.36.6