package org.opendaylight.ovsdb.plugin;
import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import com.google.common.util.concurrent.ListenableFuture;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
- private static final Integer defaultOvsdbPort = 6632;
+ private static final Integer defaultOvsdbPort = 6640;
+ private static Integer ovsdbListenPort = defaultOvsdbPort;
private ConcurrentMap<String, Connection> ovsdbConnections;
private List<ChannelHandler> handlers = null;
private InventoryServiceInternal inventoryServiceInternal;
+ private Channel serverListenChannel = null;
public InventoryServiceInternal getInventoryServiceInternal() {
return inventoryServiceInternal;
public void init() {
ovsdbConnections = new ConcurrentHashMap<String, Connection>();
+ int listenPort = defaultOvsdbPort;
+ String portString = System.getProperty("ovsdb.listenPort");
+ if (portString != null) {
+ listenPort = Integer.decode(portString).intValue();
+ }
+ ovsdbListenPort = listenPort;
}
/**
* the services provided by the class are registered in the service registry
*/
void start() {
+ startOvsdbManager();
}
/**
for (Connection connection : ovsdbConnections.values()) {
connection.disconnect();
}
+ serverListenChannel.disconnect();
}
@Override
});
ChannelFuture future = bootstrap.connect(address, port).sync();
-
Channel channel = future.channel();
- Connection connection = new Connection(identifier, channel);
- Node node = connection.getNode();
-
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- objectMapper.setSerializationInclusion(Include.NON_NULL);
-
- JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
- JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
- binderHandler.setNode(node);
- channel.pipeline().addLast(binderHandler);
-
- OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
- connection.setRpc(ovsdb);
- ovsdb.registerCallback(this);
-
- handleNewConnection(connection, address, port);
- ovsdbConnections.put(identifier, connection);
- return node;
+ return handleNewConnection(identifier, channel, this);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
-
public List<ChannelHandler> getHandlers() {
return handlers;
}
public void notifyNodeDisconnectFromMaster(Node arg0) {
}
- private void handleNewConnection (Connection connection, InetAddress address, int port) throws InterruptedException, ExecutionException {
+ private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
+ Connection connection = new Connection(identifier, channel);
+ Node node = connection.getNode();
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ objectMapper.setSerializationInclusion(Include.NON_NULL);
+
+ JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
+ JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
+ binderHandler.setNode(node);
+ channel.pipeline().addLast(binderHandler);
+
+ OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
+ connection.setRpc(ovsdb);
+ ovsdb.registerCallback(instance);
+
+ // Keeping the Initial inventory update(s) on its own thread.
+ new Thread() {
+ Connection connection;
+ String identifier;
+
+ public void run() {
+ try {
+ initializeInventoryForNewNode(connection);
+ ovsdbConnections.put(identifier, connection);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ public Thread initializeConnectionParams(String identifier, Connection connection) {
+ this.identifier = identifier;
+ this.connection = connection;
+ return this;
+ }
+ }.initializeConnectionParams(identifier, connection).start();
+ return node;
+ }
+
+ private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
+ Channel channel = connection.getChannel();
+ InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
+ int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
IPAddressProperty addressProp = new IPAddressProperty(address);
L4PortProperty l4Port = new L4PortProperty(port);
inventoryServiceInternal.addNodeProperty(connection.getNode(), addressProp);
this.update(connection.getNode(), monitor);
}
+ private void startOvsdbManager() {
+ new Thread() {
+ public void run() {
+ ovsdbManager();
+ }
+ }.start();
+ }
+
+ private void ovsdbManager() {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 100)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel channel) throws Exception {
+ logger.debug("New Passive channel created : "+ channel.toString());
+ InetAddress address = channel.remoteAddress().getAddress();
+ int port = channel.remoteAddress().getPort();
+ String identifier = address.getHostAddress()+":"+port;
+ channel.pipeline().addLast(
+ new LoggingHandler(LogLevel.INFO),
+ new JsonRpcDecoder(100000),
+ new StringEncoder(CharsetUtil.UTF_8));
+
+ Node node = handleNewConnection(identifier, channel, ConnectionService.this);
+ logger.debug("Connected Node : "+node.toString());
+ }
+ });
+ b.option(ChannelOption.TCP_NODELAY, true);
+ b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+ // Start the server.
+ ChannelFuture f = b.bind(ovsdbListenPort).sync();
+ serverListenChannel = f.channel();
+ // Wait until the server socket is closed.
+ serverListenChannel.closeFuture().sync();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ // Shut down all event loops to terminate all threads.
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+ }
+
@Override
public void update(Node node, UpdateNotification updateNotification) {
inventoryServiceInternal.processTableUpdates(node, updateNotification.getUpdate());