Refactor BmpDispatcherUtil 11/110411/2
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 1 Mar 2024 10:44:33 +0000 (11:44 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 1 Mar 2024 11:12:22 +0000 (12:12 +0100)
Rather than having a utility class and dealing with epoll/nio
bifurbication, promote BmpDispatcherUtil into a component.

Change-Id: I3e1506967d13e787cb9d9d2e8827646b1d9ba3d8
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpDispatcherImpl.java
bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpDispatcherUtil.java [deleted file]
bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpNettyGroups.java [new file with mode: 0644]
bmp/bmp-impl/src/test/java/org/opendaylight/protocol/bmp/impl/app/BmpMonitorImplTest.java
bmp/bmp-impl/src/test/java/org/opendaylight/protocol/bmp/impl/session/BmpDispatcherImplTest.java
bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockDispatcher.java
bmp/bmp-mock/src/test/java/org/opendaylight/protocol/bmp/mock/BmpMockDispatcherTest.java
bmp/bmp-mock/src/test/java/org/opendaylight/protocol/bmp/mock/BmpMockTest.java

index c62d89fd2a76a12651d50c800e7c789ab670a46f..6ff762c14d46ea9f670abc80ab83c9a397e9e0d5 100644 (file)
@@ -8,17 +8,12 @@
 package org.opendaylight.protocol.bmp.impl;
 
 import static java.util.Objects.requireNonNull;
-import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createClientBootstrap;
-import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createServerBootstrap;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
 import java.net.InetSocketAddress;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -44,37 +39,27 @@ public class BmpDispatcherImpl implements BmpDispatcher, AutoCloseable {
     private static final int CONNECT_TIMEOUT = 5000;
     private static final int INITIAL_BACKOFF = 30_000;
     private static final int MAXIMUM_BACKOFF = 720_000;
-    private static final long TIMEOUT = 10;
 
     private final BmpHandlerFactory hf;
-    private final EventLoopGroup bossGroup;
-    private final EventLoopGroup workerGroup;
+    private final BmpNettyGroups nettyGroups;
     private final BmpSessionFactory sessionFactory;
     @GuardedBy("this")
     private boolean close;
 
+
     @Activate
-    public BmpDispatcherImpl(
-            @Reference(target = "(type=global-boss-group)") final EventLoopGroup bossGroup,
-            @Reference(target = "(type=global-worker-group)") final EventLoopGroup workerGroup,
+    public BmpDispatcherImpl(@Reference final BmpNettyGroups nettyGroups,
             @Reference final BmpExtensionConsumerContext ctx, @Reference final BmpSessionFactory sessionFactory) {
-        if (Epoll.isAvailable()) {
-            this.bossGroup = new EpollEventLoopGroup();
-            this.workerGroup = new EpollEventLoopGroup();
-        } else {
-            this.bossGroup = requireNonNull(bossGroup);
-            this.workerGroup = requireNonNull(workerGroup);
-        }
-        this.hf = new BmpHandlerFactory(ctx.getBmpMessageRegistry());
+        this.nettyGroups = requireNonNull(nettyGroups);
+        hf = new BmpHandlerFactory(ctx.getBmpMessageRegistry());
         this.sessionFactory = requireNonNull(sessionFactory);
     }
 
     @Override
     public ChannelFuture createClient(final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf,
             final KeyMapping keys) {
-        final Bootstrap bootstrap = createClientBootstrap(this.sessionFactory, this.hf,
-                BmpDispatcherUtil::createChannelWithDecoder, slf, remoteAddress, this.workerGroup,
-                CONNECT_TIMEOUT, keys);
+        final Bootstrap bootstrap = nettyGroups.createClientBootstrap(sessionFactory, hf,
+                BmpNettyGroups::createChannelWithDecoder, slf, remoteAddress, CONNECT_TIMEOUT, keys);
         final ChannelFuture channelPromise = bootstrap.connect();
         channelPromise.addListener(new BootstrapListener(bootstrap, remoteAddress, slf, keys));
         LOG.debug("Initiated BMP Client {} at {}.", channelPromise, remoteAddress);
@@ -84,8 +69,8 @@ public class BmpDispatcherImpl implements BmpDispatcher, AutoCloseable {
     @Override
     public ChannelFuture createServer(final InetSocketAddress address, final BmpSessionListenerFactory slf,
             final KeyMapping keys) {
-        final ServerBootstrap serverBootstrap = createServerBootstrap(this.sessionFactory, this.hf, slf,
-                BmpDispatcherUtil::createChannelWithDecoder, this.bossGroup, this.workerGroup, keys);
+        final ServerBootstrap serverBootstrap = nettyGroups.createServerBootstrap(sessionFactory, hf, slf,
+                BmpNettyGroups::createChannelWithDecoder, keys);
         final ChannelFuture channelFuture = serverBootstrap.bind(address);
         LOG.debug("Initiated BMP server {} at {}.", channelFuture, address);
         return channelFuture;
@@ -95,11 +80,7 @@ public class BmpDispatcherImpl implements BmpDispatcher, AutoCloseable {
     @PreDestroy
     @Override
     public synchronized void close() {
-        this.close = true;
-        if (Epoll.isAvailable()) {
-            this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
-            this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
-        }
+        close = true;
     }
 
     private class BootstrapListener implements ChannelFutureListener {
@@ -114,7 +95,7 @@ public class BmpDispatcherImpl implements BmpDispatcher, AutoCloseable {
                 final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf, final KeyMapping keys) {
             this.bootstrap = bootstrap;
             this.remoteAddress = remoteAddress;
-            this.delay = INITIAL_BACKOFF;
+            delay = INITIAL_BACKOFF;
             this.slf = slf;
             this.keys = keys;
         }
@@ -127,27 +108,26 @@ public class BmpDispatcherImpl implements BmpDispatcher, AutoCloseable {
                 LOG.debug("Connection {} succeeded!", future);
                 future.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> scheduleConnect());
             } else {
-                if (this.delay > MAXIMUM_BACKOFF) {
+                if (delay > MAXIMUM_BACKOFF) {
                     LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP "
-                            + "router {}.", this.remoteAddress);
+                            + "router {}.", remoteAddress);
                     future.cancel(false);
                     return;
                 }
                 final EventLoop loop = future.channel().eventLoop();
-                loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
+                loop.schedule(() -> bootstrap.connect().addListener(this), delay, TimeUnit.MILLISECONDS);
                 LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
-                        this.remoteAddress, this.delay);
-                this.delay *= 2;
+                        remoteAddress, delay);
+                delay *= 2;
             }
         }
 
         private void scheduleConnect() {
-            if (!BmpDispatcherImpl.this.close) {
+            if (!close) {
                 timer.schedule(new TimerTask() {
                     @Override
                     public void run() {
-                        createClient(BootstrapListener.this.remoteAddress, BootstrapListener.this.slf,
-                                BootstrapListener.this.keys);
+                        createClient(remoteAddress, slf, keys);
                     }
                 }, 5);
             }
diff --git a/bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpDispatcherUtil.java b/bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpDispatcherUtil.java
deleted file mode 100644 (file)
index ba89ad1..0000000
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.protocol.bmp.impl;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AbstractChannel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollChannelOption;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import org.eclipse.jdt.annotation.NonNull;
-import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
-import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
-import org.opendaylight.protocol.concepts.KeyMapping;
-
-public final class BmpDispatcherUtil {
-    private static final int MAX_CONNECTIONS_COUNT = 128;
-
-    private BmpDispatcherUtil() {
-        // Hidden on purpose
-    }
-
-    public static ChannelInitializer<AbstractChannel> createChannelWithDecoder(
-            final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
-            final @NonNull BmpSessionListenerFactory slf) {
-        return new ChannelInitializer<>() {
-            @Override
-            protected void initChannel(final AbstractChannel ch) throws Exception {
-                ch.pipeline().addLast(hf.getDecoders());
-                ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
-            }
-        };
-    }
-
-    public static ChannelInitializer<AbstractChannel> createChannelWithEncoder(
-            final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
-            final @NonNull BmpSessionListenerFactory slf) {
-        return new ChannelInitializer<>() {
-            @Override
-            protected void initChannel(final AbstractChannel ch) throws Exception {
-                ch.pipeline().addLast(hf.getEncoders());
-                ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
-            }
-        };
-    }
-
-    /**
-     * To be used by BMP Dispatcher mainly.
-     */
-    public static ServerBootstrap createServerBootstrap(final @NonNull BmpSessionFactory sessionFactory,
-            final @NonNull BmpHandlerFactory hf, final @NonNull BmpSessionListenerFactory slf,
-            final @NonNull CreateChannel createChannel, final @NonNull EventLoopGroup bossGroup,
-            final @NonNull EventLoopGroup workerGroup, final @NonNull KeyMapping keys) {
-        return createServerBootstrap(sessionFactory, hf, slf, createChannel, bossGroup, workerGroup, keys, true);
-    }
-
-    public static ServerBootstrap createServerBootstrap(final @NonNull BmpSessionFactory sessionFactory,
-            final @NonNull BmpHandlerFactory hf, final @NonNull BmpSessionListenerFactory slf,
-            final @NonNull CreateChannel createChannel, final @NonNull EventLoopGroup bossGroup,
-            final @NonNull EventLoopGroup workerGroup, final @NonNull KeyMapping keys, final boolean tryEpollSocket) {
-
-        final ServerBootstrap serverBootstrap = new ServerBootstrap();
-        serverBootstrap.childHandler(createChannel.create(sessionFactory, hf, slf));
-        serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
-        serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-        serverBootstrap.group(bossGroup, workerGroup);
-
-        if (!tryEpollSocket) {
-            serverBootstrap.channel(NioServerSocketChannel.class);
-        } else {
-            if (Epoll.isAvailable()) {
-                serverBootstrap.channel(EpollServerSocketChannel.class);
-            } else {
-                serverBootstrap.channel(NioServerSocketChannel.class);
-            }
-
-            if (!keys.isEmpty()) {
-                if (Epoll.isAvailable()) {
-                    serverBootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.asMap());
-                } else {
-                    throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
-                }
-            }
-        }
-
-        return serverBootstrap;
-    }
-
-    /**
-     * To be used by BMP Dispatcher mainly.
-     */
-    public static Bootstrap createClientBootstrap(final @NonNull BmpSessionFactory sessionFactory,
-            final @NonNull BmpHandlerFactory hf, final @NonNull CreateChannel createChannel,
-            final @NonNull BmpSessionListenerFactory slf, final @NonNull InetSocketAddress remoteAddress,
-            final @NonNull EventLoopGroup workerGroup, final int connectTimeout, final @NonNull KeyMapping keys) {
-        return createClientBootstrap(sessionFactory, hf, createChannel, slf, remoteAddress, null,
-                workerGroup, connectTimeout, keys, false, true);
-    }
-
-    public static Bootstrap createClientBootstrap(final @NonNull BmpSessionFactory sessionFactory,
-            final @NonNull BmpHandlerFactory hf, final @NonNull CreateChannel createChannel,
-            final @NonNull BmpSessionListenerFactory slf, final @NonNull InetSocketAddress remoteAddress,
-            final @Nullable SocketAddress localAddress, final @NonNull EventLoopGroup workerGroup,
-            final int connectTimeout, final @NonNull KeyMapping keys, final boolean reuseAddress,
-            final boolean tryEpollSocket) {
-        final Bootstrap bootstrap = new Bootstrap();
-        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
-        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
-        bootstrap.group(workerGroup);
-        bootstrap.handler(createChannel.create(sessionFactory, hf, slf));
-        if (localAddress != null) {
-            bootstrap.localAddress(localAddress);
-        }
-        bootstrap.remoteAddress(remoteAddress);
-
-        if (!tryEpollSocket) {
-            bootstrap.channel(NioSocketChannel.class);
-
-        } else {
-            if (Epoll.isAvailable()) {
-                bootstrap.channel(EpollSocketChannel.class);
-            } else {
-                bootstrap.channel(NioSocketChannel.class);
-            }
-            if (!keys.isEmpty()) {
-                if (Epoll.isAvailable()) {
-                    bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.asMap());
-                } else {
-                    throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
-                }
-            }
-        }
-        return bootstrap;
-    }
-
-    @FunctionalInterface
-    public interface CreateChannel {
-        ChannelInitializer<AbstractChannel> create(@NonNull BmpSessionFactory sessionFactory,
-                @NonNull BmpHandlerFactory hf, @NonNull BmpSessionListenerFactory slf);
-    }
-}
diff --git a/bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpNettyGroups.java b/bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpNettyGroups.java
new file mode 100644 (file)
index 0000000..918bd19
--- /dev/null
@@ -0,0 +1,213 @@
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bmp.impl;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.AbstractBootstrap;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
+import org.opendaylight.protocol.concepts.KeyMapping;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+
+@Singleton
+@Component(service = BmpNettyGroups.class)
+public final class BmpNettyGroups implements AutoCloseable {
+    @FunctionalInterface
+    public interface CreateChannel {
+        ChannelInitializer<AbstractChannel> create(@NonNull BmpSessionFactory sessionFactory,
+                @NonNull BmpHandlerFactory hf, @NonNull BmpSessionListenerFactory slf);
+    }
+
+    private abstract static class AbstractImpl {
+        private final EventLoopGroup bossGroup;
+        private final EventLoopGroup workerGroup;
+
+        AbstractImpl(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+            this.bossGroup = requireNonNull(bossGroup);
+            this.workerGroup = requireNonNull(workerGroup);
+        }
+
+        abstract void setupBootstrap(Bootstrap bootstrap);
+
+        abstract void setupBootstrap(ServerBootstrap serverBootstrap);
+
+        abstract void setupKeys(AbstractBootstrap<?, ?> bootstrap, KeyMapping keys);
+    }
+
+    private static final class EpollImpl extends AbstractImpl {
+        EpollImpl() {
+            super(new EpollEventLoopGroup(BOSS_TF), new EpollEventLoopGroup(WORKER_TF));
+        }
+
+        @Override
+        void setupBootstrap(final Bootstrap bootstrap) {
+            bootstrap.channel(EpollSocketChannel.class);
+        }
+
+        @Override
+        void setupBootstrap(final ServerBootstrap serverBootstrap) {
+            serverBootstrap.channel(EpollServerSocketChannel.class);
+        }
+
+        @Override
+        void setupKeys(final AbstractBootstrap<?, ?> bootstrap, final KeyMapping keys) {
+            bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.asMap());
+        }
+    }
+
+    private static final class NioImpl extends AbstractImpl {
+        NioImpl() {
+            super(new NioEventLoopGroup(BOSS_TF), new NioEventLoopGroup(WORKER_TF));
+        }
+
+        @Override
+        void setupBootstrap(final Bootstrap bootstrap) {
+            bootstrap.channel(NioSocketChannel.class);
+        }
+
+        @Override
+        void setupBootstrap(final ServerBootstrap serverBootstrap) {
+            serverBootstrap.channel(NioServerSocketChannel.class);
+        }
+
+        @Override
+        void setupKeys(final AbstractBootstrap<?, ?> bootstrap, final KeyMapping keys) {
+            throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
+        }
+    }
+
+    private static final ThreadFactory BOSS_TF = new ThreadFactoryBuilder()
+        .setNameFormat("bmp-boss-%d")
+        .setDaemon(true)
+        .build();
+    private static final ThreadFactory WORKER_TF = new ThreadFactoryBuilder()
+        .setNameFormat("bmp-worker-%d")
+        .setDaemon(true)
+        .build();
+    private static final int MAX_CONNECTIONS_COUNT = 128;
+    private static final long TIMEOUT = 10;
+
+    private AbstractImpl impl;
+
+    @Inject
+    @Activate
+    public BmpNettyGroups() {
+        impl = Epoll.isAvailable() ? new EpollImpl() : new NioImpl();
+    }
+
+    @PreDestroy
+    @Deactivate
+    @Override
+    public void close() {
+        if (impl != null) {
+            impl.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
+            impl.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
+            impl = null;
+        }
+    }
+
+    public static ChannelInitializer<AbstractChannel> createChannelWithDecoder(
+            final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
+            final @NonNull BmpSessionListenerFactory slf) {
+        return new ChannelInitializer<>() {
+            @Override
+            protected void initChannel(final AbstractChannel ch) throws Exception {
+                ch.pipeline().addLast(hf.getDecoders()).addLast(sessionFactory.getSession(ch, slf));
+            }
+        };
+    }
+
+    public static ChannelInitializer<AbstractChannel> createChannelWithEncoder(
+            final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
+            final @NonNull BmpSessionListenerFactory slf) {
+        return new ChannelInitializer<>() {
+            @Override
+            protected void initChannel(final AbstractChannel ch) throws Exception {
+                ch.pipeline().addLast(hf.getEncoders()).addLast(sessionFactory.getSession(ch, slf));
+            }
+        };
+    }
+
+    /**
+     * To be used by BMP Dispatcher mainly.
+     */
+    public ServerBootstrap createServerBootstrap(final @NonNull BmpSessionFactory sessionFactory,
+            final @NonNull BmpHandlerFactory hf, final @NonNull BmpSessionListenerFactory slf,
+            final @NonNull CreateChannel createChannel, final @NonNull KeyMapping keys) {
+        final var serverBootstrap = new ServerBootstrap();
+        impl.setupBootstrap(serverBootstrap);
+        if (!keys.isEmpty()) {
+            impl.setupKeys(serverBootstrap, keys);
+        }
+
+        return serverBootstrap.childHandler(createChannel.create(sessionFactory, hf, slf))
+            .option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT)
+            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            .group(impl.bossGroup, impl.workerGroup);
+    }
+
+    /**
+     * To be used by BMP Dispatcher mainly.
+     */
+    public Bootstrap createClientBootstrap(final @NonNull BmpSessionFactory sessionFactory,
+            final @NonNull BmpHandlerFactory hf, final @NonNull CreateChannel createChannel,
+            final @NonNull BmpSessionListenerFactory slf, final @NonNull InetSocketAddress remoteAddress,
+            final int connectTimeout, final @NonNull KeyMapping keys) {
+        return createClientBootstrap(sessionFactory, hf, createChannel, slf, remoteAddress, null, connectTimeout, keys,
+            false);
+    }
+
+    public Bootstrap createClientBootstrap(final @NonNull BmpSessionFactory sessionFactory,
+            final @NonNull BmpHandlerFactory hf, final @NonNull CreateChannel createChannel,
+            final @NonNull BmpSessionListenerFactory slf, final @NonNull InetSocketAddress remoteAddress,
+            final @Nullable SocketAddress localAddress, final int connectTimeout, final @NonNull KeyMapping keys,
+            final boolean reuseAddress) {
+        final var bootstrap = new Bootstrap();
+        impl.setupBootstrap(bootstrap);
+        if (!keys.isEmpty()) {
+            impl.setupKeys(bootstrap, keys);
+        }
+        if (localAddress != null) {
+            bootstrap.localAddress(localAddress);
+        }
+        return bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress)
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
+            .group(impl.workerGroup)
+            .handler(createChannel.create(sessionFactory, hf, slf))
+            .remoteAddress(remoteAddress);
+    }
+}
index a926a4769f8aea55fa0f96a267f1adb44279e5de..5b5b08e2c5bb5274c8bfe48275135f690f08713f 100644 (file)
@@ -57,6 +57,7 @@ import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
 import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
 import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
 import org.opendaylight.protocol.bmp.parser.BmpActivator;
@@ -164,8 +165,7 @@ public class BmpMonitorImplTest extends AbstractConcurrentDataBrokerTest {
         bmpActivator.start(ctx);
         msgRegistry = ctx.getBmpMessageRegistry();
 
-        dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx,
-            new DefaultBmpSessionFactory());
+        dispatcher = new BmpDispatcherImpl(new BmpNettyGroups(), ctx, new DefaultBmpSessionFactory());
 
         final InetSocketAddress inetAddress = new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS),
             MONITOR_LOCAL_PORT);
index e1107cfbc4d6325adbb033dd67b9e7cb98a6d8f8..ae25c53ac720a8ac56969de763d50cc167566f5e 100644 (file)
@@ -18,7 +18,6 @@ import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
 import org.junit.After;
 import org.junit.Before;
@@ -32,6 +31,7 @@ import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderC
 import org.opendaylight.protocol.bmp.api.BmpSession;
 import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
 import org.opendaylight.protocol.bmp.parser.BmpActivator;
 import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
 import org.opendaylight.protocol.concepts.KeyMapping;
@@ -64,7 +64,7 @@ public class BmpDispatcherImplTest {
         bmpActivator = new BmpActivator(context);
         bmpActivator.start(ctx);
 
-        dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx,
+        dispatcher = new BmpDispatcherImpl(new BmpNettyGroups(), ctx,
             (channel, sessionListenerFactory) -> BmpDispatcherImplTest.this.mockedSession);
     }
 
index 8cc8e61073bf2b357defad29a62fae281deaafce..2b617f10f5b51a9ec281365ddfefacb757c3696e 100644 (file)
@@ -14,8 +14,6 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Timer;
@@ -24,15 +22,14 @@ import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
-import org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil;
 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
 import org.opendaylight.protocol.concepts.KeyMapping;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class BmpMockDispatcher implements AutoCloseable {
-
     private static final Logger LOG = LoggerFactory.getLogger(BmpMockDispatcher.class);
     private static final int CONNECT_TIMEOUT = 2000;
     private static final int INITIAL_BACKOFF = 15_000;
@@ -40,8 +37,7 @@ final class BmpMockDispatcher implements AutoCloseable {
     private final BmpHandlerFactory hf;
     private final BmpSessionFactory sessionFactory;
 
-    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
-    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+    private final BmpNettyGroups nettyGroups;
     private final BmpMockSessionListenerFactory slf;
     @GuardedBy("this")
     private boolean close;
@@ -51,13 +47,14 @@ final class BmpMockDispatcher implements AutoCloseable {
         slf = new BmpMockSessionListenerFactory();
         requireNonNull(registry);
         hf = new BmpHandlerFactory(registry);
+        nettyGroups = new BmpNettyGroups();
     }
 
     ChannelFuture createClient(final @NonNull SocketAddress localAddress,
             final @NonNull InetSocketAddress remoteAddress) {
-        final Bootstrap bootstrap = BmpDispatcherUtil.createClientBootstrap(sessionFactory, hf,
-                BmpDispatcherUtil::createChannelWithEncoder, slf, remoteAddress, localAddress, workerGroup,
-                CONNECT_TIMEOUT, KeyMapping.of(), true, false);
+        final Bootstrap bootstrap = nettyGroups.createClientBootstrap(sessionFactory, hf,
+            BmpNettyGroups::createChannelWithEncoder, slf, remoteAddress, localAddress, CONNECT_TIMEOUT,
+            KeyMapping.of(), true);
         final ChannelFuture channelFuture = bootstrap.connect(remoteAddress);
         LOG.info("BMP client {} <--> {} deployed", localAddress, remoteAddress);
         channelFuture.addListener(new BootstrapListener(bootstrap, localAddress, remoteAddress));
@@ -66,9 +63,8 @@ final class BmpMockDispatcher implements AutoCloseable {
 
     ChannelFuture createServer(final InetSocketAddress localAddress) {
         requireNonNull(localAddress);
-        final ServerBootstrap serverBootstrap = BmpDispatcherUtil.createServerBootstrap(sessionFactory,
-                hf, slf, BmpDispatcherUtil::createChannelWithEncoder,
-                bossGroup, workerGroup, KeyMapping.of(), false);
+        final ServerBootstrap serverBootstrap = nettyGroups.createServerBootstrap(sessionFactory, hf, slf,
+            BmpNettyGroups::createChannelWithEncoder, KeyMapping.of());
         final ChannelFuture channelFuture = serverBootstrap.bind(localAddress);
         LOG.info("Initiated BMP server at {}.", localAddress);
         return channelFuture;
@@ -76,7 +72,10 @@ final class BmpMockDispatcher implements AutoCloseable {
 
     @Override
     public synchronized void close() {
-        close = true;
+        if (!close) {
+            close = true;
+            nettyGroups.close();
+        }
     }
 
     private class BootstrapListener implements ChannelFutureListener {
index e7a2f841acb5fb2bfb9e81980d12c0206677d4e1..50538a8e1fe128e22cea22adcbc6b8d9d75b1cf7 100644 (file)
@@ -16,7 +16,6 @@ import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
 import com.google.common.net.InetAddresses;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
 import org.junit.Before;
 import org.junit.Test;
@@ -26,6 +25,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
 import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
 import org.opendaylight.protocol.bmp.spi.registry.BmpExtensionProviderContext;
 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
@@ -57,8 +57,8 @@ public class BmpMockDispatcherTest {
         final int port = InetSocketAddressUtil.getRandomPort();
         final InetSocketAddress serverAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
 
-        final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
-                new NioEventLoopGroup(), new NioEventLoopGroup(), ctx, sessionFactory);
+        var threadGroups = new BmpNettyGroups();
+        final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(threadGroups, ctx, sessionFactory);
         final ChannelFuture futureServer = bmpDispatcher
                 .createServer(serverAddr, slf, KeyMapping.of());
         waitFutureSuccess(futureServer);
@@ -71,15 +71,17 @@ public class BmpMockDispatcherTest {
         checkEquals(() -> assertTrue(sl.getStatus()));
         channel.close();
         bmpDispatcher.close();
+        threadGroups.close();
         checkEquals(() -> assertFalse(sl.getStatus()));
 
-        final BmpDispatcherImpl bmpDispatcher2 = new BmpDispatcherImpl(
-                new NioEventLoopGroup(), new NioEventLoopGroup(), ctx, sessionFactory);
+        threadGroups = new BmpNettyGroups();
+        final BmpDispatcherImpl bmpDispatcher2 = new BmpDispatcherImpl(threadGroups, ctx, sessionFactory);
         final ChannelFuture futureServer2 = bmpDispatcher2.createServer(serverAddr, slf, KeyMapping.of());
         futureServer2.sync();
         checkEquals(() -> assertTrue(sl.getStatus()));
 
         bmpDispatcher2.close();
+        threadGroups.close();
         bmpMockDispatcher.close();
         checkEquals(() -> assertFalse(sl.getStatus()));
     }
@@ -87,8 +89,8 @@ public class BmpMockDispatcherTest {
     @Test(timeout = 20000)
     public void testCreateServer() throws Exception {
         final int port = InetSocketAddressUtil.getRandomPort();
-        final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
-                new NioEventLoopGroup(), new NioEventLoopGroup(), ctx, sessionFactory);
+        final BmpNettyGroups threadGroups = new BmpNettyGroups();
+        final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(threadGroups, ctx, sessionFactory);
         final ChannelFuture futureServer = bmpMockDispatcher.createServer(
                 new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port));
         futureServer.sync();
@@ -102,6 +104,7 @@ public class BmpMockDispatcherTest {
         channel.close();
 
         bmpDispatcher.close();
+        threadGroups.close();
         bmpMockDispatcher.close();
         checkEquals(() -> assertFalse(sl.getStatus()));
     }
index fa1f63708e6a591605d1232ac9235d43ed46457b..a5389f77406ca2b64483adc5f0dffb9fcab785d2 100644 (file)
@@ -14,7 +14,6 @@ import static org.mockito.Mockito.verify;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.ServiceLoader;
@@ -28,6 +27,7 @@ import org.opendaylight.protocol.bmp.api.BmpSession;
 import org.opendaylight.protocol.bmp.api.BmpSessionListener;
 import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
 import org.opendaylight.protocol.bmp.parser.BmpActivator;
 import org.opendaylight.protocol.bmp.spi.registry.BmpExtensionProviderActivator;
@@ -48,8 +48,7 @@ public class BmpMockTest {
         bmpActivator = new BmpActivator(
             ServiceLoader.load(BGPExtensionConsumerContext.class).findFirst().orElseThrow());
         bmpActivator.start(ctx);
-        bmpDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx,
-            new DefaultBmpSessionFactory());
+        bmpDispatcher = new BmpDispatcherImpl(new BmpNettyGroups(), ctx, new DefaultBmpSessionFactory());
     }
 
     @After