Remove ThreadPool in connection release path
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPDispatcherImpl.java
index f734babeb93f00ac93a3aba77cd05a9361caf46d..e7fd673692e7ab775cc5d95f131d6a25c0ef71d3 100644 (file)
@@ -18,6 +18,7 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollChannelOption;
@@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory;
 public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
     private static final int SOCKET_BACKLOG_SIZE = 128;
+    private static final int FIX_BUFFER_SIZE = 1;
     private static final long TIMEOUT = 10;
 
     private static final WriteBufferWaterMark WATER_MARK = new WriteBufferWaterMark(128 * 1024, 256 * 1024);
@@ -62,7 +64,7 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
     private final BGPPeerRegistry bgpPeerRegistry;
 
     public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup,
-        final EventLoopGroup workerGroup, final BGPPeerRegistry bgpPeerRegistry) {
+            final EventLoopGroup workerGroup, final BGPPeerRegistry bgpPeerRegistry) {
         if (Epoll.isAvailable()) {
             this.bossGroup = new EpollEventLoopGroup();
             this.workerGroup = new EpollEventLoopGroup();
@@ -74,16 +76,13 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         this.handlerFactory = new BGPHandlerFactory(messageRegistry);
     }
 
-    @Override
-    public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress, final int retryTimer) {
-        return createClient(remoteAddress, retryTimer, createClientBootStrap(KeyMapping.getKeyMapping(), false));
-    }
-
-    private synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress,
-        final int retryTimer, final Bootstrap clientBootStrap) {
+    @VisibleForTesting
+    public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
+            final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
+        final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.getKeyMapping(), reuseAddress, localAddress);
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
         final ChannelPipelineInitializer<BGPSessionImpl> initializer = BGPChannel.createChannelPipelineInitializer(
-            this.handlerFactory, snf);
+                this.handlerFactory, snf);
 
         final BGPProtocolSessionPromise<BGPSessionImpl> sessionPromise = new BGPProtocolSessionPromise<>(remoteAddress,
                 retryTimer, clientBootStrap, this.bgpPeerRegistry);
@@ -93,15 +92,8 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         return sessionPromise;
     }
 
-    @VisibleForTesting
-    public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
-        final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
-        final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.getKeyMapping(), reuseAddress);
-        clientBootStrap.localAddress(localAddress);
-        return createClient(remoteAddress, retryTimer, clientBootStrap);
-    }
-
-    private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress) {
+    private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress,
+            final InetSocketAddress localAddress) {
         final Bootstrap bootstrap = new Bootstrap();
         if (Epoll.isAvailable()) {
             bootstrap.channel(EpollSocketChannel.class);
@@ -118,20 +110,21 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         }
 
         // Make sure we are doing round-robin processing
-        bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+        bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(FIX_BUFFER_SIZE));
         bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
         bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
         bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
 
-        if (bootstrap.group() == null) {
+        if (bootstrap.config().group() == null) {
             bootstrap.group(this.workerGroup);
         }
+        bootstrap.localAddress(localAddress);
 
         return bootstrap;
     }
 
     @Override
-    public synchronized void close() throws InterruptedException {
+    public synchronized void close() {
         if (Epoll.isAvailable()) {
             LOG.debug("Closing Dispatcher");
             this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
@@ -141,20 +134,19 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
 
     @Override
     public synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
-            final int retryTimer, final KeyMapping keys) {
-        return createReconnectingClient(remoteAddress, retryTimer, keys, null, false);
+            final InetSocketAddress localAddress, final int retryTimer, final KeyMapping keys) {
+        return createReconnectingClient(remoteAddress, retryTimer, keys, localAddress, false);
     }
 
     @VisibleForTesting
-    protected synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
-        final int retryTimer, final KeyMapping keys, final InetSocketAddress localAddress,
-        final boolean reuseAddress) {
+    synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
+            final int retryTimer, final KeyMapping keys, final InetSocketAddress localAddress,
+            final boolean reuseAddress) {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
-        final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress);
-        bootstrap.localAddress(localAddress);
+        final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress, localAddress);
         final BGPReconnectPromise<?> reconnectPromise = new BGPReconnectPromise<>(GlobalEventExecutor.INSTANCE,
-            remoteAddress, retryTimer, bootstrap, this.bgpPeerRegistry,
-            BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf));
+                remoteAddress, retryTimer, bootstrap, this.bgpPeerRegistry,
+                BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf));
         reconnectPromise.connect();
         return reconnectPromise;
     }
@@ -162,7 +154,8 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
     @Override
     public synchronized ChannelFuture createServer(final InetSocketAddress serverAddress) {
         final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(this.bgpPeerRegistry);
-        final ChannelPipelineInitializer<?> initializer = BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf);
+        final ChannelPipelineInitializer<?> initializer = BGPChannel.
+                createChannelPipelineInitializer(this.handlerFactory, snf);
         final ServerBootstrap serverBootstrap = createServerBootstrap(initializer);
         final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress);
         LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress);
@@ -174,7 +167,7 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         return this.bgpPeerRegistry;
     }
 
-    private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
+    private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer<?> initializer) {
         final ServerBootstrap serverBootstrap = new ServerBootstrap();
         if (Epoll.isAvailable()) {
             serverBootstrap.channel(EpollServerSocketChannel.class);
@@ -190,9 +183,9 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
 
         // Make sure we are doing round-robin processing
-        serverBootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+        serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(FIX_BUFFER_SIZE));
 
-        if (serverBootstrap.group() == null) {
+        if (serverBootstrap.config().group() == null) {
             serverBootstrap.group(this.bossGroup, this.workerGroup);
         }
         return serverBootstrap;
@@ -224,11 +217,12 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
             };
         }
 
-        static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) {
+        static <S extends BGPSession> ChannelHandler createServerChannelHandler(
+                final ChannelPipelineInitializer<S> initializer) {
             return new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(final SocketChannel channel) {
-                    initializer.initializeChannel(channel, new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
+                    initializer.initializeChannel(channel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
                 }
             };
         }