package org.opendaylight.protocol.bmp.impl;
import static java.util.Objects.requireNonNull;
-import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createClientBootstrap;
-import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createServerBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.Timer;
import java.util.TimerTask;
private static final int CONNECT_TIMEOUT = 5000;
private static final int INITIAL_BACKOFF = 30_000;
private static final int MAXIMUM_BACKOFF = 720_000;
- private static final long TIMEOUT = 10;
private final BmpHandlerFactory hf;
- private final EventLoopGroup bossGroup;
- private final EventLoopGroup workerGroup;
+ private final BmpNettyGroups nettyGroups;
private final BmpSessionFactory sessionFactory;
@GuardedBy("this")
private boolean close;
+
@Activate
- public BmpDispatcherImpl(
- @Reference(target = "(type=global-boss-group)") final EventLoopGroup bossGroup,
- @Reference(target = "(type=global-worker-group)") final EventLoopGroup workerGroup,
+ public BmpDispatcherImpl(@Reference final BmpNettyGroups nettyGroups,
@Reference final BmpExtensionConsumerContext ctx, @Reference final BmpSessionFactory sessionFactory) {
- if (Epoll.isAvailable()) {
- this.bossGroup = new EpollEventLoopGroup();
- this.workerGroup = new EpollEventLoopGroup();
- } else {
- this.bossGroup = requireNonNull(bossGroup);
- this.workerGroup = requireNonNull(workerGroup);
- }
- this.hf = new BmpHandlerFactory(ctx.getBmpMessageRegistry());
+ this.nettyGroups = requireNonNull(nettyGroups);
+ hf = new BmpHandlerFactory(ctx.getBmpMessageRegistry());
this.sessionFactory = requireNonNull(sessionFactory);
}
@Override
public ChannelFuture createClient(final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf,
final KeyMapping keys) {
- final Bootstrap bootstrap = createClientBootstrap(this.sessionFactory, this.hf,
- BmpDispatcherUtil::createChannelWithDecoder, slf, remoteAddress, this.workerGroup,
- CONNECT_TIMEOUT, keys);
+ final Bootstrap bootstrap = nettyGroups.createClientBootstrap(sessionFactory, hf,
+ BmpNettyGroups::createChannelWithDecoder, slf, remoteAddress, CONNECT_TIMEOUT, keys);
final ChannelFuture channelPromise = bootstrap.connect();
channelPromise.addListener(new BootstrapListener(bootstrap, remoteAddress, slf, keys));
LOG.debug("Initiated BMP Client {} at {}.", channelPromise, remoteAddress);
@Override
public ChannelFuture createServer(final InetSocketAddress address, final BmpSessionListenerFactory slf,
final KeyMapping keys) {
- final ServerBootstrap serverBootstrap = createServerBootstrap(this.sessionFactory, this.hf, slf,
- BmpDispatcherUtil::createChannelWithDecoder, this.bossGroup, this.workerGroup, keys);
+ final ServerBootstrap serverBootstrap = nettyGroups.createServerBootstrap(sessionFactory, hf, slf,
+ BmpNettyGroups::createChannelWithDecoder, keys);
final ChannelFuture channelFuture = serverBootstrap.bind(address);
LOG.debug("Initiated BMP server {} at {}.", channelFuture, address);
return channelFuture;
@PreDestroy
@Override
public synchronized void close() {
- this.close = true;
- if (Epoll.isAvailable()) {
- this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
- this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
- }
+ close = true;
}
private class BootstrapListener implements ChannelFutureListener {
final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf, final KeyMapping keys) {
this.bootstrap = bootstrap;
this.remoteAddress = remoteAddress;
- this.delay = INITIAL_BACKOFF;
+ delay = INITIAL_BACKOFF;
this.slf = slf;
this.keys = keys;
}
LOG.debug("Connection {} succeeded!", future);
future.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> scheduleConnect());
} else {
- if (this.delay > MAXIMUM_BACKOFF) {
+ if (delay > MAXIMUM_BACKOFF) {
LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP "
- + "router {}.", this.remoteAddress);
+ + "router {}.", remoteAddress);
future.cancel(false);
return;
}
final EventLoop loop = future.channel().eventLoop();
- loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
+ loop.schedule(() -> bootstrap.connect().addListener(this), delay, TimeUnit.MILLISECONDS);
LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
- this.remoteAddress, this.delay);
- this.delay *= 2;
+ remoteAddress, delay);
+ delay *= 2;
}
}
private void scheduleConnect() {
- if (!BmpDispatcherImpl.this.close) {
+ if (!close) {
timer.schedule(new TimerTask() {
@Override
public void run() {
- createClient(BootstrapListener.this.remoteAddress, BootstrapListener.this.slf,
- BootstrapListener.this.keys);
+ createClient(remoteAddress, slf, keys);
}
}, 5);
}
+++ /dev/null
-/*
- * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.protocol.bmp.impl;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AbstractChannel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollChannelOption;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import org.eclipse.jdt.annotation.NonNull;
-import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
-import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
-import org.opendaylight.protocol.concepts.KeyMapping;
-
-public final class BmpDispatcherUtil {
- private static final int MAX_CONNECTIONS_COUNT = 128;
-
- private BmpDispatcherUtil() {
- // Hidden on purpose
- }
-
- public static ChannelInitializer<AbstractChannel> createChannelWithDecoder(
- final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
- final @NonNull BmpSessionListenerFactory slf) {
- return new ChannelInitializer<>() {
- @Override
- protected void initChannel(final AbstractChannel ch) throws Exception {
- ch.pipeline().addLast(hf.getDecoders());
- ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
- }
- };
- }
-
- public static ChannelInitializer<AbstractChannel> createChannelWithEncoder(
- final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
- final @NonNull BmpSessionListenerFactory slf) {
- return new ChannelInitializer<>() {
- @Override
- protected void initChannel(final AbstractChannel ch) throws Exception {
- ch.pipeline().addLast(hf.getEncoders());
- ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
- }
- };
- }
-
- /**
- * To be used by BMP Dispatcher mainly.
- */
- public static ServerBootstrap createServerBootstrap(final @NonNull BmpSessionFactory sessionFactory,
- final @NonNull BmpHandlerFactory hf, final @NonNull BmpSessionListenerFactory slf,
- final @NonNull CreateChannel createChannel, final @NonNull EventLoopGroup bossGroup,
- final @NonNull EventLoopGroup workerGroup, final @NonNull KeyMapping keys) {
- return createServerBootstrap(sessionFactory, hf, slf, createChannel, bossGroup, workerGroup, keys, true);
- }
-
- public static ServerBootstrap createServerBootstrap(final @NonNull BmpSessionFactory sessionFactory,
- final @NonNull BmpHandlerFactory hf, final @NonNull BmpSessionListenerFactory slf,
- final @NonNull CreateChannel createChannel, final @NonNull EventLoopGroup bossGroup,
- final @NonNull EventLoopGroup workerGroup, final @NonNull KeyMapping keys, final boolean tryEpollSocket) {
-
- final ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.childHandler(createChannel.create(sessionFactory, hf, slf));
- serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
- serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- serverBootstrap.group(bossGroup, workerGroup);
-
- if (!tryEpollSocket) {
- serverBootstrap.channel(NioServerSocketChannel.class);
- } else {
- if (Epoll.isAvailable()) {
- serverBootstrap.channel(EpollServerSocketChannel.class);
- } else {
- serverBootstrap.channel(NioServerSocketChannel.class);
- }
-
- if (!keys.isEmpty()) {
- if (Epoll.isAvailable()) {
- serverBootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.asMap());
- } else {
- throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
- }
- }
- }
-
- return serverBootstrap;
- }
-
- /**
- * To be used by BMP Dispatcher mainly.
- */
- public static Bootstrap createClientBootstrap(final @NonNull BmpSessionFactory sessionFactory,
- final @NonNull BmpHandlerFactory hf, final @NonNull CreateChannel createChannel,
- final @NonNull BmpSessionListenerFactory slf, final @NonNull InetSocketAddress remoteAddress,
- final @NonNull EventLoopGroup workerGroup, final int connectTimeout, final @NonNull KeyMapping keys) {
- return createClientBootstrap(sessionFactory, hf, createChannel, slf, remoteAddress, null,
- workerGroup, connectTimeout, keys, false, true);
- }
-
- public static Bootstrap createClientBootstrap(final @NonNull BmpSessionFactory sessionFactory,
- final @NonNull BmpHandlerFactory hf, final @NonNull CreateChannel createChannel,
- final @NonNull BmpSessionListenerFactory slf, final @NonNull InetSocketAddress remoteAddress,
- final @Nullable SocketAddress localAddress, final @NonNull EventLoopGroup workerGroup,
- final int connectTimeout, final @NonNull KeyMapping keys, final boolean reuseAddress,
- final boolean tryEpollSocket) {
- final Bootstrap bootstrap = new Bootstrap();
- bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
- bootstrap.group(workerGroup);
- bootstrap.handler(createChannel.create(sessionFactory, hf, slf));
- if (localAddress != null) {
- bootstrap.localAddress(localAddress);
- }
- bootstrap.remoteAddress(remoteAddress);
-
- if (!tryEpollSocket) {
- bootstrap.channel(NioSocketChannel.class);
-
- } else {
- if (Epoll.isAvailable()) {
- bootstrap.channel(EpollSocketChannel.class);
- } else {
- bootstrap.channel(NioSocketChannel.class);
- }
- if (!keys.isEmpty()) {
- if (Epoll.isAvailable()) {
- bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.asMap());
- } else {
- throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
- }
- }
- }
- return bootstrap;
- }
-
- @FunctionalInterface
- public interface CreateChannel {
- ChannelInitializer<AbstractChannel> create(@NonNull BmpSessionFactory sessionFactory,
- @NonNull BmpHandlerFactory hf, @NonNull BmpSessionListenerFactory slf);
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bmp.impl;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.AbstractBootstrap;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
+import org.opendaylight.protocol.concepts.KeyMapping;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+
+@Singleton
+@Component(service = BmpNettyGroups.class)
+public final class BmpNettyGroups implements AutoCloseable {
+ @FunctionalInterface
+ public interface CreateChannel {
+ ChannelInitializer<AbstractChannel> create(@NonNull BmpSessionFactory sessionFactory,
+ @NonNull BmpHandlerFactory hf, @NonNull BmpSessionListenerFactory slf);
+ }
+
+ private abstract static class AbstractImpl {
+ private final EventLoopGroup bossGroup;
+ private final EventLoopGroup workerGroup;
+
+ AbstractImpl(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+ this.bossGroup = requireNonNull(bossGroup);
+ this.workerGroup = requireNonNull(workerGroup);
+ }
+
+ abstract void setupBootstrap(Bootstrap bootstrap);
+
+ abstract void setupBootstrap(ServerBootstrap serverBootstrap);
+
+ abstract void setupKeys(AbstractBootstrap<?, ?> bootstrap, KeyMapping keys);
+ }
+
+ private static final class EpollImpl extends AbstractImpl {
+ EpollImpl() {
+ super(new EpollEventLoopGroup(BOSS_TF), new EpollEventLoopGroup(WORKER_TF));
+ }
+
+ @Override
+ void setupBootstrap(final Bootstrap bootstrap) {
+ bootstrap.channel(EpollSocketChannel.class);
+ }
+
+ @Override
+ void setupBootstrap(final ServerBootstrap serverBootstrap) {
+ serverBootstrap.channel(EpollServerSocketChannel.class);
+ }
+
+ @Override
+ void setupKeys(final AbstractBootstrap<?, ?> bootstrap, final KeyMapping keys) {
+ bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.asMap());
+ }
+ }
+
+ private static final class NioImpl extends AbstractImpl {
+ NioImpl() {
+ super(new NioEventLoopGroup(BOSS_TF), new NioEventLoopGroup(WORKER_TF));
+ }
+
+ @Override
+ void setupBootstrap(final Bootstrap bootstrap) {
+ bootstrap.channel(NioSocketChannel.class);
+ }
+
+ @Override
+ void setupBootstrap(final ServerBootstrap serverBootstrap) {
+ serverBootstrap.channel(NioServerSocketChannel.class);
+ }
+
+ @Override
+ void setupKeys(final AbstractBootstrap<?, ?> bootstrap, final KeyMapping keys) {
+ throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
+ }
+ }
+
+ private static final ThreadFactory BOSS_TF = new ThreadFactoryBuilder()
+ .setNameFormat("bmp-boss-%d")
+ .setDaemon(true)
+ .build();
+ private static final ThreadFactory WORKER_TF = new ThreadFactoryBuilder()
+ .setNameFormat("bmp-worker-%d")
+ .setDaemon(true)
+ .build();
+ private static final int MAX_CONNECTIONS_COUNT = 128;
+ private static final long TIMEOUT = 10;
+
+ private AbstractImpl impl;
+
+ @Inject
+ @Activate
+ public BmpNettyGroups() {
+ impl = Epoll.isAvailable() ? new EpollImpl() : new NioImpl();
+ }
+
+ @PreDestroy
+ @Deactivate
+ @Override
+ public void close() {
+ if (impl != null) {
+ impl.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
+ impl.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
+ impl = null;
+ }
+ }
+
+ public static ChannelInitializer<AbstractChannel> createChannelWithDecoder(
+ final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
+ final @NonNull BmpSessionListenerFactory slf) {
+ return new ChannelInitializer<>() {
+ @Override
+ protected void initChannel(final AbstractChannel ch) throws Exception {
+ ch.pipeline().addLast(hf.getDecoders()).addLast(sessionFactory.getSession(ch, slf));
+ }
+ };
+ }
+
+ public static ChannelInitializer<AbstractChannel> createChannelWithEncoder(
+ final @NonNull BmpSessionFactory sessionFactory, final @NonNull BmpHandlerFactory hf,
+ final @NonNull BmpSessionListenerFactory slf) {
+ return new ChannelInitializer<>() {
+ @Override
+ protected void initChannel(final AbstractChannel ch) throws Exception {
+ ch.pipeline().addLast(hf.getEncoders()).addLast(sessionFactory.getSession(ch, slf));
+ }
+ };
+ }
+
+ /**
+ * To be used by BMP Dispatcher mainly.
+ */
+ public ServerBootstrap createServerBootstrap(final @NonNull BmpSessionFactory sessionFactory,
+ final @NonNull BmpHandlerFactory hf, final @NonNull BmpSessionListenerFactory slf,
+ final @NonNull CreateChannel createChannel, final @NonNull KeyMapping keys) {
+ final var serverBootstrap = new ServerBootstrap();
+ impl.setupBootstrap(serverBootstrap);
+ if (!keys.isEmpty()) {
+ impl.setupKeys(serverBootstrap, keys);
+ }
+
+ return serverBootstrap.childHandler(createChannel.create(sessionFactory, hf, slf))
+ .option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .group(impl.bossGroup, impl.workerGroup);
+ }
+
+ /**
+ * To be used by BMP Dispatcher mainly.
+ */
+ public Bootstrap createClientBootstrap(final @NonNull BmpSessionFactory sessionFactory,
+ final @NonNull BmpHandlerFactory hf, final @NonNull CreateChannel createChannel,
+ final @NonNull BmpSessionListenerFactory slf, final @NonNull InetSocketAddress remoteAddress,
+ final int connectTimeout, final @NonNull KeyMapping keys) {
+ return createClientBootstrap(sessionFactory, hf, createChannel, slf, remoteAddress, null, connectTimeout, keys,
+ false);
+ }
+
+ public Bootstrap createClientBootstrap(final @NonNull BmpSessionFactory sessionFactory,
+ final @NonNull BmpHandlerFactory hf, final @NonNull CreateChannel createChannel,
+ final @NonNull BmpSessionListenerFactory slf, final @NonNull InetSocketAddress remoteAddress,
+ final @Nullable SocketAddress localAddress, final int connectTimeout, final @NonNull KeyMapping keys,
+ final boolean reuseAddress) {
+ final var bootstrap = new Bootstrap();
+ impl.setupBootstrap(bootstrap);
+ if (!keys.isEmpty()) {
+ impl.setupKeys(bootstrap, keys);
+ }
+ if (localAddress != null) {
+ bootstrap.localAddress(localAddress);
+ }
+ return bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
+ .group(impl.workerGroup)
+ .handler(createChannel.create(sessionFactory, hf, slf))
+ .remoteAddress(remoteAddress);
+ }
+}
import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
import org.opendaylight.protocol.bmp.parser.BmpActivator;
bmpActivator.start(ctx);
msgRegistry = ctx.getBmpMessageRegistry();
- dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx,
- new DefaultBmpSessionFactory());
+ dispatcher = new BmpDispatcherImpl(new BmpNettyGroups(), ctx, new DefaultBmpSessionFactory());
final InetSocketAddress inetAddress = new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS),
MONITOR_LOCAL_PORT);
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import org.junit.After;
import org.junit.Before;
import org.opendaylight.protocol.bmp.api.BmpSession;
import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
import org.opendaylight.protocol.bmp.parser.BmpActivator;
import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
import org.opendaylight.protocol.concepts.KeyMapping;
bmpActivator = new BmpActivator(context);
bmpActivator.start(ctx);
- dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx,
+ dispatcher = new BmpDispatcherImpl(new BmpNettyGroups(), ctx,
(channel, sessionListenerFactory) -> BmpDispatcherImplTest.this.mockedSession);
}
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Timer;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
-import org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil;
import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
import org.opendaylight.protocol.concepts.KeyMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class BmpMockDispatcher implements AutoCloseable {
-
private static final Logger LOG = LoggerFactory.getLogger(BmpMockDispatcher.class);
private static final int CONNECT_TIMEOUT = 2000;
private static final int INITIAL_BACKOFF = 15_000;
private final BmpHandlerFactory hf;
private final BmpSessionFactory sessionFactory;
- private final EventLoopGroup bossGroup = new NioEventLoopGroup();
- private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private final BmpNettyGroups nettyGroups;
private final BmpMockSessionListenerFactory slf;
@GuardedBy("this")
private boolean close;
slf = new BmpMockSessionListenerFactory();
requireNonNull(registry);
hf = new BmpHandlerFactory(registry);
+ nettyGroups = new BmpNettyGroups();
}
ChannelFuture createClient(final @NonNull SocketAddress localAddress,
final @NonNull InetSocketAddress remoteAddress) {
- final Bootstrap bootstrap = BmpDispatcherUtil.createClientBootstrap(sessionFactory, hf,
- BmpDispatcherUtil::createChannelWithEncoder, slf, remoteAddress, localAddress, workerGroup,
- CONNECT_TIMEOUT, KeyMapping.of(), true, false);
+ final Bootstrap bootstrap = nettyGroups.createClientBootstrap(sessionFactory, hf,
+ BmpNettyGroups::createChannelWithEncoder, slf, remoteAddress, localAddress, CONNECT_TIMEOUT,
+ KeyMapping.of(), true);
final ChannelFuture channelFuture = bootstrap.connect(remoteAddress);
LOG.info("BMP client {} <--> {} deployed", localAddress, remoteAddress);
channelFuture.addListener(new BootstrapListener(bootstrap, localAddress, remoteAddress));
ChannelFuture createServer(final InetSocketAddress localAddress) {
requireNonNull(localAddress);
- final ServerBootstrap serverBootstrap = BmpDispatcherUtil.createServerBootstrap(sessionFactory,
- hf, slf, BmpDispatcherUtil::createChannelWithEncoder,
- bossGroup, workerGroup, KeyMapping.of(), false);
+ final ServerBootstrap serverBootstrap = nettyGroups.createServerBootstrap(sessionFactory, hf, slf,
+ BmpNettyGroups::createChannelWithEncoder, KeyMapping.of());
final ChannelFuture channelFuture = serverBootstrap.bind(localAddress);
LOG.info("Initiated BMP server at {}.", localAddress);
return channelFuture;
@Override
public synchronized void close() {
- close = true;
+ if (!close) {
+ close = true;
+ nettyGroups.close();
+ }
}
private class BootstrapListener implements ChannelFutureListener {
import com.google.common.net.InetAddresses;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
import org.opendaylight.protocol.bmp.spi.registry.BmpExtensionProviderContext;
import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
final int port = InetSocketAddressUtil.getRandomPort();
final InetSocketAddress serverAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
- final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
- new NioEventLoopGroup(), new NioEventLoopGroup(), ctx, sessionFactory);
+ var threadGroups = new BmpNettyGroups();
+ final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(threadGroups, ctx, sessionFactory);
final ChannelFuture futureServer = bmpDispatcher
.createServer(serverAddr, slf, KeyMapping.of());
waitFutureSuccess(futureServer);
checkEquals(() -> assertTrue(sl.getStatus()));
channel.close();
bmpDispatcher.close();
+ threadGroups.close();
checkEquals(() -> assertFalse(sl.getStatus()));
- final BmpDispatcherImpl bmpDispatcher2 = new BmpDispatcherImpl(
- new NioEventLoopGroup(), new NioEventLoopGroup(), ctx, sessionFactory);
+ threadGroups = new BmpNettyGroups();
+ final BmpDispatcherImpl bmpDispatcher2 = new BmpDispatcherImpl(threadGroups, ctx, sessionFactory);
final ChannelFuture futureServer2 = bmpDispatcher2.createServer(serverAddr, slf, KeyMapping.of());
futureServer2.sync();
checkEquals(() -> assertTrue(sl.getStatus()));
bmpDispatcher2.close();
+ threadGroups.close();
bmpMockDispatcher.close();
checkEquals(() -> assertFalse(sl.getStatus()));
}
@Test(timeout = 20000)
public void testCreateServer() throws Exception {
final int port = InetSocketAddressUtil.getRandomPort();
- final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
- new NioEventLoopGroup(), new NioEventLoopGroup(), ctx, sessionFactory);
+ final BmpNettyGroups threadGroups = new BmpNettyGroups();
+ final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(threadGroups, ctx, sessionFactory);
final ChannelFuture futureServer = bmpMockDispatcher.createServer(
new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port));
futureServer.sync();
channel.close();
bmpDispatcher.close();
+ threadGroups.close();
bmpMockDispatcher.close();
checkEquals(() -> assertFalse(sl.getStatus()));
}
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.ServiceLoader;
import org.opendaylight.protocol.bmp.api.BmpSessionListener;
import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
+import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
import org.opendaylight.protocol.bmp.parser.BmpActivator;
import org.opendaylight.protocol.bmp.spi.registry.BmpExtensionProviderActivator;
bmpActivator = new BmpActivator(
ServiceLoader.load(BGPExtensionConsumerContext.class).findFirst().orElseThrow());
bmpActivator.start(ctx);
- bmpDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx,
- new DefaultBmpSessionFactory());
+ bmpDispatcher = new BmpDispatcherImpl(new BmpNettyGroups(), ctx, new DefaultBmpSessionFactory());
}
@After