From 19359ff05eb84d6e83cb6252fd0764dd8a686a0f Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 29 Feb 2024 14:49:53 +0100 Subject: [PATCH] Disconnect bgp-rib-impl from global event loop groups We are dancing around event loop group lifecycle based on epoll availability. Since the global groups are going away anyway, extract this complexity into an artifact. Change-Id: Ifdc5404603c96a939659d13e339a7f83c268c2d0 Signed-off-by: Robert Varga --- .../bgp/rib/impl/BGPDispatcherImpl.java | 105 +++----------- .../protocol/bgp/rib/impl/BGPNettyGroups.java | 132 ++++++++++++++++++ .../bgp/rib/impl/AbstractAddPathTest.java | 25 +--- .../rib/impl/AbstractBGPDispatcherTest.java | 22 +-- .../protocol/bgp/testtool/BGPTestTool.java | 8 +- 5 files changed, 165 insertions(+), 127 deletions(-) create mode 100644 bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPNettyGroups.java diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java index 0aa3bd6d23..5d321ba29a 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java @@ -18,25 +18,14 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; -import io.netty.channel.epoll.Epoll; -import io.netty.channel.epoll.EpollChannelOption; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollMode; -import io.netty.channel.epoll.EpollServerSocketChannel; -import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; -import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionConsumerContext; @@ -50,7 +39,6 @@ import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory; import org.opendaylight.protocol.concepts.KeyMapping; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,11 +47,10 @@ import org.slf4j.LoggerFactory; * Implementation of BGPDispatcher. */ @Singleton -@Component(immediate = true, service = BGPDispatcher.class) -public final class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable { +@Component(immediate = true) +public final class BGPDispatcherImpl implements BGPDispatcher { private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class); private static final int SOCKET_BACKLOG_SIZE = 128; - private static final long TIMEOUT = 10; private static final WriteBufferWaterMark WATER_MARK = new WriteBufferWaterMark(128 * 1024, 256 * 1024); @@ -74,23 +61,14 @@ public final class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable { private static final RecvByteBufAllocator RECV_ALLOCATOR = new AdaptiveRecvByteBufAllocator().maxMessagesPerRead(1); private final BGPHandlerFactory handlerFactory; - private final EventLoopGroup bossGroup; - private final EventLoopGroup workerGroup; private final BGPPeerRegistry bgpPeerRegistry; + private final BGPNettyGroups nettyGroups; @Inject @Activate public BGPDispatcherImpl(@Reference final BGPExtensionConsumerContext extensions, - @Reference(target = "(type=global-boss-group)") final EventLoopGroup bossGroup, - @Reference(target = "(type=global-worker-group)") final EventLoopGroup workerGroup, - @Reference final BGPPeerRegistry bgpPeerRegistry) { - if (Epoll.isAvailable()) { - this.bossGroup = new EpollEventLoopGroup(); - this.workerGroup = new EpollEventLoopGroup(); - } else { - this.bossGroup = requireNonNull(bossGroup); - this.workerGroup = requireNonNull(workerGroup); - } + @Reference final BGPNettyGroups nettyGroups, @Reference final BGPPeerRegistry bgpPeerRegistry) { + this.nettyGroups = requireNonNull(nettyGroups); this.bgpPeerRegistry = requireNonNull(bgpPeerRegistry); handlerFactory = new BGPHandlerFactory(extensions.getMessageRegistry()); } @@ -113,44 +91,13 @@ public final class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable { private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress, final InetSocketAddress localAddress) { - final Bootstrap bootstrap = new Bootstrap(); - if (Epoll.isAvailable()) { - bootstrap.channel(EpollSocketChannel.class); - bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); - } else { - bootstrap.channel(NioSocketChannel.class); - } - if (keys != null && !keys.isEmpty()) { - if (Epoll.isAvailable()) { - bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.asMap()); - } else { - throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause()); - } - } - - // Make sure we are doing round-robin processing - bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR); - bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); - bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK); - bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); - - if (bootstrap.config().group() == null) { - bootstrap.group(workerGroup); - } - bootstrap.localAddress(localAddress); - - return bootstrap; - } - - @Deactivate - @PreDestroy - @Override - public synchronized void close() { - if (Epoll.isAvailable()) { - LOG.debug("Closing Dispatcher"); - workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); - bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); - } + return nettyGroups.createBootstrap(keys) + // Make sure we are doing round-robin processing + .option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR) + .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK) + .option(ChannelOption.SO_REUSEADDR, reuseAddress) + .localAddress(localAddress); } @Override @@ -189,27 +136,13 @@ public final class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable { } private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) { - final ServerBootstrap serverBootstrap = new ServerBootstrap(); - if (Epoll.isAvailable()) { - serverBootstrap.channel(EpollServerSocketChannel.class); - serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); - } else { - serverBootstrap.channel(NioServerSocketChannel.class); - } - final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer); - serverBootstrap.childHandler(serverChannelHandler); - - serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE); - serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); - serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK); - - // Make sure we are doing round-robin processing - serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR); - - if (serverBootstrap.config().group() == null) { - serverBootstrap.group(bossGroup, workerGroup); - } - return serverBootstrap; + return nettyGroups.createServerBootstrap() + .childHandler(BGPChannel.createServerChannelHandler(initializer)) + .option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK) + // Make sure we are doing round-robin processing + .option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR); } private static final class BGPChannel { diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPNettyGroups.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPNettyGroups.java new file mode 100644 index 0000000000..7971251fad --- /dev/null +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPNettyGroups.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.protocol.bgp.rib.impl; + +import static java.util.Objects.requireNonNull; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollChannelOption; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollMode; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.protocol.concepts.KeyMapping; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; + +@Singleton +@Component(service = BGPNettyGroups.class) +public final class BGPNettyGroups implements AutoCloseable { + private abstract static class AbstractImpl { + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + + AbstractImpl(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) { + this.bossGroup = requireNonNull(bossGroup); + this.workerGroup = requireNonNull(workerGroup); + } + + abstract void setupBootstrap(Bootstrap bootstrap); + + abstract void setupBootstrap(ServerBootstrap serverBootstrap); + + abstract void setupKeys(Bootstrap bootstrap, KeyMapping keys); + } + + private static final class EpollImpl extends AbstractImpl { + EpollImpl() { + super(new EpollEventLoopGroup(), new EpollEventLoopGroup()); + } + + @Override + void setupBootstrap(final Bootstrap bootstrap) { + bootstrap.channel(EpollSocketChannel.class); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } + + @Override + void setupBootstrap(final ServerBootstrap serverBootstrap) { + serverBootstrap.channel(EpollServerSocketChannel.class); + serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } + + @Override + void setupKeys(final Bootstrap bootstrap, final KeyMapping keys) { + bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.asMap()); + } + } + + private static final class NioImpl extends AbstractImpl { + NioImpl() { + super(new NioEventLoopGroup(), new NioEventLoopGroup()); + } + + @Override + void setupBootstrap(final Bootstrap bootstrap) { + bootstrap.channel(NioSocketChannel.class); + } + + @Override + void setupBootstrap(final ServerBootstrap serverBootstrap) { + serverBootstrap.channel(NioServerSocketChannel.class); + } + + @Override + void setupKeys(final Bootstrap bootstrap, final KeyMapping keys) { + throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause()); + } + } + + private static final long TIMEOUT = 10; + + private AbstractImpl impl; + + @Inject + @Activate + public BGPNettyGroups() { + impl = Epoll.isAvailable() ? new EpollImpl() : new NioImpl(); + } + + @PreDestroy + @Deactivate + @Override + public void close() { + if (impl != null) { + impl.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); + impl.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); + impl = null; + } + } + + Bootstrap createBootstrap(final @Nullable KeyMapping keys) { + final var bootstrap = new Bootstrap(); + impl.setupBootstrap(bootstrap); + if (keys != null && !keys.isEmpty()) { + impl.setupKeys(bootstrap, keys); + } + return bootstrap.group(impl.workerGroup); + } + + ServerBootstrap createServerBootstrap() { + final var bootstrap = new ServerBootstrap(); + impl.setupBootstrap(bootstrap); + return bootstrap.group(impl.bossGroup, impl.workerGroup); + } +} diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractAddPathTest.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractAddPathTest.java index d57830e0e8..8a1dd5dc0d 100644 --- a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractAddPathTest.java +++ b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractAddPathTest.java @@ -14,8 +14,6 @@ import static org.opendaylight.protocol.util.CheckUtil.readDataOperational; import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess; import com.google.common.collect.Lists; -import io.netty.channel.epoll.Epoll; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.Future; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -23,7 +21,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.mockito.Mock; @@ -130,8 +127,7 @@ public abstract class AbstractAddPathTest extends DefaultRibPoliciesMockTest { private final BGPExtensionProviderContext context = new SimpleBGPExtensionProviderContext(); private final RIBActivator ribActivator = new RIBActivator(); private BGPActivator bgpActivator; - private NioEventLoopGroup worker; - private NioEventLoopGroup boss; + private BGPNettyGroups groups; private org.opendaylight.protocol.bgp.inet.BGPActivator inetActivator; protected StrictBGPPeerRegistry serverRegistry; protected ConstantCodecsRegistry codecsRegistry; @@ -149,12 +145,9 @@ public abstract class AbstractAddPathTest extends DefaultRibPoliciesMockTest { inetActivator = new org.opendaylight.protocol.bgp.inet.BGPActivator(); bgpActivator.start(context); inetActivator.start(context); - if (!Epoll.isAvailable()) { - worker = new NioEventLoopGroup(); - boss = new NioEventLoopGroup(); - } + groups = new BGPNettyGroups(); serverRegistry = new StrictBGPPeerRegistry(); - serverDispatcher = new BGPDispatcherImpl(context, boss, worker, serverRegistry); + serverDispatcher = new BGPDispatcherImpl(context, groups, serverRegistry); doReturn(Mockito.mock(ClusterSingletonServiceRegistration.class)).when(clusterSingletonServiceProvider) .registerClusterSingletonService(any(ClusterSingletonService.class)); @@ -165,14 +158,7 @@ public abstract class AbstractAddPathTest extends DefaultRibPoliciesMockTest { @Override @After public void tearDown() throws Exception { - serverDispatcher.close(); - if (!Epoll.isAvailable()) { - worker.shutdownGracefully(0, 0, TimeUnit.SECONDS); - boss.shutdownGracefully(0, 0, TimeUnit.SECONDS); - } - clientDispatchers.forEach(BGPDispatcherImpl::close); - clientDispatchers = null; - + groups.close(); super.tearDown(); } @@ -240,8 +226,7 @@ public abstract class AbstractAddPathTest extends DefaultRibPoliciesMockTest { final SimpleSessionListener sessionListener, final AsNumber remoteAsNumber) throws InterruptedException { final StrictBGPPeerRegistry clientRegistry = new StrictBGPPeerRegistry(); - final BGPDispatcherImpl clientDispatcher = new BGPDispatcherImpl(context, boss, worker, - clientRegistry); + final BGPDispatcherImpl clientDispatcher = new BGPDispatcherImpl(context, groups, clientRegistry); clientDispatchers.add(clientDispatcher); clientRegistry.addPeer(new IpAddressNoZone(new Ipv4AddressNoZone(RIB_ID)), sessionListener, diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPDispatcherTest.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPDispatcherTest.java index cb5e0524ac..93eb74612d 100644 --- a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPDispatcherTest.java +++ b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPDispatcherTest.java @@ -12,14 +12,10 @@ import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess; import com.google.common.base.Preconditions; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.Epoll; -import io.netty.channel.nio.NioEventLoopGroup; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; -import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil; @@ -58,37 +54,29 @@ public class AbstractBGPDispatcherTest { protected BGPDispatcherImpl serverDispatcher; protected SimpleSessionListener serverListener; protected InetSocketAddress clientAddress; - private EventLoopGroup boss; - private EventLoopGroup worker; + private BGPNettyGroups groups; @Before public void setUp() { - if (!Epoll.isAvailable()) { - boss = new NioEventLoopGroup(); - worker = new NioEventLoopGroup(); - } + groups = new BGPNettyGroups(); registry = new StrictBGPPeerRegistry(); clientListener = new SimpleSessionListener(); serverListener = new SimpleSessionListener(); final BGPExtensionConsumerContext ctx = ServiceLoader.load(BGPExtensionConsumerContext.class).findFirst() .orElseThrow(); - serverDispatcher = new BGPDispatcherImpl(ctx, boss, worker, registry); + serverDispatcher = new BGPDispatcherImpl(ctx, groups, registry); clientAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(); final IpAddressNoZone clientPeerIp = new IpAddressNoZone(new Ipv4AddressNoZone( clientAddress.getAddress().getHostAddress())); registry.addPeer(clientPeerIp, clientListener, createPreferences(clientAddress)); - clientDispatcher = new BGPDispatcherImpl(ctx, boss, worker, registry); + clientDispatcher = new BGPDispatcherImpl(ctx, groups, registry); } @After public void tearDown() throws Exception { - serverDispatcher.close(); registry.close(); - if (!Epoll.isAvailable()) { - worker.shutdownGracefully(0, 0, TimeUnit.SECONDS); - boss.shutdownGracefully(0, 0, TimeUnit.SECONDS); - } + groups.close(); } protected BGPSessionPreferences createPreferences(final InetSocketAddress socketAddress) { diff --git a/bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/BGPTestTool.java b/bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/BGPTestTool.java index c03f3a221d..3d289d22b1 100644 --- a/bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/BGPTestTool.java +++ b/bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/BGPTestTool.java @@ -11,7 +11,6 @@ import static org.opendaylight.protocol.bgp.testtool.BGPPeerBuilder.createPeer; import com.google.common.collect.Lists; import com.google.common.net.InetAddresses; -import io.netty.channel.nio.NioEventLoopGroup; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -21,6 +20,7 @@ import java.util.Map; import java.util.ServiceLoader; import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionConsumerContext; import org.opendaylight.protocol.bgp.rib.impl.BGPDispatcherImpl; +import org.opendaylight.protocol.bgp.rib.impl.BGPNettyGroups; import org.opendaylight.protocol.bgp.rib.impl.StrictBGPPeerRegistry; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher; import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener; @@ -78,7 +78,7 @@ final class BGPTestTool { final BGPSessionListener sessionListener = new TestingListener(arguments.getNumberOfPrefixes(), arguments.getExtendedCommunities(), arguments.getMultiPathSupport()); - this.listeners.put(address.getHostAddress(), sessionListener); + listeners.put(address.getHostAddress(), sessionListener); createPeer(dispatcher, arguments, new InetSocketAddress(address, port), sessionListener, bgpParameters); numberOfSpeakers--; address = InetAddresses.increment(address); @@ -87,7 +87,7 @@ final class BGPTestTool { private static BGPDispatcher initializeActivator() { return new BGPDispatcherImpl(ServiceLoader.load(BGPExtensionConsumerContext.class).findFirst().orElseThrow(), - new NioEventLoopGroup(), new NioEventLoopGroup(), new StrictBGPPeerRegistry()); + new BGPNettyGroups(), new StrictBGPPeerRegistry()); } private static OptionalCapabilities createMPCapability(final AddressFamily afi, @@ -130,7 +130,7 @@ final class BGPTestTool { } void printCount(final String localAddress) { - final BGPSessionListener listener = this.listeners.get(localAddress); + final BGPSessionListener listener = listeners.get(localAddress); if (listener != null) { ((TestingListener) listener).printCount(localAddress); } -- 2.36.6