OVSDB Manager Passive listen support 32/2332/1
authorMadhu Venugopal <mavenugo@gmail.com>
Sun, 3 Nov 2013 10:53:39 +0000 (05:53 -0500)
committerMadhu Venugopal <mavenugo@gmail.com>
Sun, 3 Nov 2013 10:53:39 +0000 (05:53 -0500)
This change brings in the following :
1. ovsdb manager passive listen on port 6640. With this change, the OVSDB-server can connect to the controller via set-manager configuration.
2. Default listening and connecting port is changed to 6640 (as per the IANA port assignment)
3. Reorganized ConnectionManager a bit to share the exact same Channel handling code for both Active and Passive ovsdb connections.

Change-Id: I76836aa53b519af08aaf3399ba104351d94ddd4c
Signed-off-by: Madhu Venugopal <mavenugo@gmail.com>
ovsdb/src/main/java/org/opendaylight/ovsdb/plugin/ConnectionService.java

index 7fc0342866d831e22b5bab921df27571b32bd35f..a8e7acc56f89b2ce2180f78f7c210a365f60c6f9 100755 (executable)
@@ -1,9 +1,11 @@
 package org.opendaylight.ovsdb.plugin;
 
 import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.*;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
 import io.netty.handler.logging.LogLevel;
@@ -35,6 +37,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -50,10 +53,12 @@ import java.util.concurrent.ExecutionException;
 public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
     protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
 
-    private static final Integer defaultOvsdbPort = 6632;
+    private static final Integer defaultOvsdbPort = 6640;
+    private static Integer ovsdbListenPort = defaultOvsdbPort;
     private ConcurrentMap<String, Connection> ovsdbConnections;
     private List<ChannelHandler> handlers = null;
     private InventoryServiceInternal inventoryServiceInternal;
+    private Channel serverListenChannel = null;
 
     public InventoryServiceInternal getInventoryServiceInternal() {
         return inventoryServiceInternal;
@@ -71,6 +76,12 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
 
     public void init() {
         ovsdbConnections = new ConcurrentHashMap<String, Connection>();
+        int listenPort = defaultOvsdbPort;
+        String portString = System.getProperty("ovsdb.listenPort");
+        if (portString != null) {
+            listenPort = Integer.decode(portString).intValue();
+        }
+        ovsdbListenPort = listenPort;
     }
 
     /**
@@ -86,6 +97,7 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
      * the services provided by the class are registered in the service registry
      */
     void start() {
+        startOvsdbManager();
     }
 
     /**
@@ -97,6 +109,7 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
         for (Connection connection : ovsdbConnections.values()) {
             connection.disconnect();
         }
+        serverListenChannel.disconnect();
     }
 
     @Override
@@ -154,34 +167,14 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
             });
 
             ChannelFuture future = bootstrap.connect(address, port).sync();
-
             Channel channel = future.channel();
-            Connection connection = new Connection(identifier, channel);
-            Node node = connection.getNode();
-
-            ObjectMapper objectMapper = new ObjectMapper();
-            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            objectMapper.setSerializationInclusion(Include.NON_NULL);
-
-            JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
-            JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
-            binderHandler.setNode(node);
-            channel.pipeline().addLast(binderHandler);
-
-            OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
-            connection.setRpc(ovsdb);
-            ovsdb.registerCallback(this);
-
-            handleNewConnection(connection, address, port);
-            ovsdbConnections.put(identifier, connection);
-            return node;
+            return handleNewConnection(identifier, channel, this);
         } catch (Exception e) {
             e.printStackTrace();
         }
         return null;
     }
 
-
     public List<ChannelHandler> getHandlers() {
         return handlers;
     }
@@ -204,7 +197,51 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
     public void notifyNodeDisconnectFromMaster(Node arg0) {
     }
 
-    private void handleNewConnection (Connection connection, InetAddress address, int port) throws InterruptedException, ExecutionException {
+    private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
+        Connection connection = new Connection(identifier, channel);
+        Node node = connection.getNode();
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        objectMapper.setSerializationInclusion(Include.NON_NULL);
+
+        JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
+        JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
+        binderHandler.setNode(node);
+        channel.pipeline().addLast(binderHandler);
+
+        OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
+        connection.setRpc(ovsdb);
+        ovsdb.registerCallback(instance);
+
+        // Keeping the Initial inventory update(s) on its own thread.
+        new Thread() {
+            Connection connection;
+            String identifier;
+
+            public void run() {
+                try {
+                    initializeInventoryForNewNode(connection);
+                    ovsdbConnections.put(identifier, connection);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (ExecutionException e) {
+                    e.printStackTrace();
+                }
+            }
+            public Thread initializeConnectionParams(String identifier, Connection connection) {
+                this.identifier = identifier;
+                this.connection = connection;
+                return this;
+            }
+        }.initializeConnectionParams(identifier, connection).start();
+        return node;
+    }
+
+    private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
+        Channel channel = connection.getChannel();
+        InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
+        int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
         IPAddressProperty addressProp = new IPAddressProperty(address);
         L4PortProperty l4Port = new L4PortProperty(port);
         inventoryServiceInternal.addNodeProperty(connection.getNode(), addressProp);
@@ -238,6 +275,55 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
         this.update(connection.getNode(), monitor);
     }
 
+    private void startOvsdbManager() {
+        new Thread() {
+            public void run() {
+                ovsdbManager();
+            }
+        }.start();
+    }
+
+    private void ovsdbManager() {
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup)
+             .channel(NioServerSocketChannel.class)
+             .option(ChannelOption.SO_BACKLOG, 100)
+             .handler(new LoggingHandler(LogLevel.INFO))
+             .childHandler(new ChannelInitializer<SocketChannel>() {
+                 @Override
+                 public void initChannel(SocketChannel channel) throws Exception {
+                     logger.debug("New Passive channel created : "+ channel.toString());
+                     InetAddress address = channel.remoteAddress().getAddress();
+                     int port = channel.remoteAddress().getPort();
+                     String identifier = address.getHostAddress()+":"+port;
+                     channel.pipeline().addLast(
+                             new LoggingHandler(LogLevel.INFO),
+                             new JsonRpcDecoder(100000),
+                             new StringEncoder(CharsetUtil.UTF_8));
+
+                     Node node = handleNewConnection(identifier, channel, ConnectionService.this);
+                     logger.debug("Connected Node : "+node.toString());
+                 }
+             });
+            b.option(ChannelOption.TCP_NODELAY, true);
+            b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+            // Start the server.
+            ChannelFuture f = b.bind(ovsdbListenPort).sync();
+            serverListenChannel =  f.channel();
+            // Wait until the server socket is closed.
+            serverListenChannel.closeFuture().sync();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } finally {
+            // Shut down all event loops to terminate all threads.
+            bossGroup.shutdownGracefully();
+            workerGroup.shutdownGracefully();
+        }
+    }
+
     @Override
     public void update(Node node, UpdateNotification updateNotification) {
         inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());