X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=bgp%2Frib-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fbgp%2Frib%2Fimpl%2FBGPDispatcherImpl.java;h=5ac92300d8e682da9471391e0a382b808b43c99a;hb=bc62bc6768bc8f91f929f61d1736331eef0f32b3;hp=a966ab53c8a3cb1c526a5cb807c7643da0f2dc94;hpb=c6758c94aa35a2e034b84dd3b2b26f7d7f61ac8a;p=bgpcep.git diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java index a966ab53c8..5ac92300d8 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java @@ -7,140 +7,225 @@ */ package org.opendaylight.protocol.bgp.rib.impl; +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollChannelOption; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollMode; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; - import java.net.InetSocketAddress; - -import org.opendaylight.bgpcep.tcpmd5.KeyMapping; -import org.opendaylight.bgpcep.tcpmd5.netty.MD5ChannelFactory; -import org.opendaylight.bgpcep.tcpmd5.netty.MD5ChannelOption; -import org.opendaylight.bgpcep.tcpmd5.netty.MD5ServerChannelFactory; -import org.opendaylight.protocol.bgp.parser.BGPSessionListener; +import java.util.concurrent.TimeUnit; import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry; +import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise; +import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry; -import org.opendaylight.protocol.bgp.rib.impl.spi.BGPServerDispatcher; -import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator; -import org.opendaylight.protocol.framework.AbstractDispatcher; -import org.opendaylight.protocol.framework.ReconnectStrategy; -import org.opendaylight.protocol.framework.ReconnectStrategyFactory; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber; +import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer; +import org.opendaylight.protocol.bgp.rib.spi.BGPSession; +import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory; +import org.opendaylight.protocol.concepts.KeyMapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of BGPDispatcher. */ -public final class BGPDispatcherImpl extends AbstractDispatcher implements BGPDispatcher, BGPServerDispatcher, AutoCloseable { - private final MD5ServerChannelFactory scf; - private final MD5ChannelFactory cf; - private final BGPHandlerFactory hf; - private KeyMapping keys; - - public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) { - this(messageRegistry, bossGroup, workerGroup, null, null); +public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class); + private static final int SOCKET_BACKLOG_SIZE = 128; + private static final int FIX_BUFFER_SIZE = 1; + private static final long TIMEOUT = 10; + + private static final WriteBufferWaterMark WATER_MARK = new WriteBufferWaterMark(128 * 1024, 256 * 1024); + + private final BGPHandlerFactory handlerFactory; + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + private final BGPPeerRegistry bgpPeerRegistry; + + public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, + final EventLoopGroup workerGroup, final BGPPeerRegistry bgpPeerRegistry) { + if (Epoll.isAvailable()) { + this.bossGroup = new EpollEventLoopGroup(); + this.workerGroup = new EpollEventLoopGroup(); + } else { + this.bossGroup = requireNonNull(bossGroup); + this.workerGroup = requireNonNull(workerGroup); + } + this.bgpPeerRegistry = requireNonNull(bgpPeerRegistry); + this.handlerFactory = new BGPHandlerFactory(messageRegistry); } - public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MD5ChannelFactory cf, final MD5ServerChannelFactory scf) { - super(bossGroup, workerGroup); - this.hf = new BGPHandlerFactory(messageRegistry); - this.cf = cf; - this.scf = scf; + @VisibleForTesting + public synchronized Future createClient(final InetSocketAddress localAddress, + final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) { + final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.getKeyMapping(), reuseAddress, localAddress); + final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry); + final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer( + this.handlerFactory, snf); + + final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise<>(remoteAddress, + retryTimer, clientBootStrap, this.bgpPeerRegistry); + clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise)); + sessionPromise.connect(); + LOG.debug("Client created."); + return sessionPromise; } - @Override - public synchronized Future createClient(final InetSocketAddress address, - final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy) { - final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, listener); - return super.createClient(address, strategy, new PipelineInitializer() { - @Override - public void initializeChannel(final SocketChannel ch, final Promise promise) { - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders()); - ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(null, ch, promise)); - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders()); + private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress, + final InetSocketAddress localAddress) { + final Bootstrap bootstrap = new Bootstrap(); + if (Epoll.isAvailable()) { + bootstrap.channel(EpollSocketChannel.class); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } else { + bootstrap.channel(NioSocketChannel.class); + } + if (keys != null && !keys.isEmpty()) { + if (Epoll.isAvailable()) { + bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys); + } else { + throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause()); } - }); - } + } - @Override - public Future createReconnectingClient(final InetSocketAddress address, - final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategyFactory connectStrategyFactory, - final ReconnectStrategyFactory reestablishStrategyFactory) { - return this.createReconnectingClient(address, remoteAs, listener, connectStrategyFactory, reestablishStrategyFactory, - null); + // Make sure we are doing round-robin processing + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(FIX_BUFFER_SIZE)); + bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); + bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK); + bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); + + if (bootstrap.config().group() == null) { + bootstrap.group(this.workerGroup); + } + bootstrap.localAddress(localAddress); + + return bootstrap; } @Override - public void close() { + public synchronized void close() { + if (Epoll.isAvailable()) { + LOG.debug("Closing Dispatcher"); + this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); + this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); + } } @Override - public synchronized Future createReconnectingClient(final InetSocketAddress address, - final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory connectStrategyFactory, - final ReconnectStrategyFactory reestablishStrategyFactory, final KeyMapping keys) { - final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, peerRegistry); - - this.keys = keys; - final Future ret = super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategyFactory.createReconnectStrategy(), new PipelineInitializer() { - @Override - public void initializeChannel(final SocketChannel ch, final Promise promise) { - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders()); - ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(null, ch, promise)); - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders()); - } - }); - this.keys = null; + public synchronized Future createReconnectingClient(final InetSocketAddress remoteAddress, + final InetSocketAddress localAddress, final int retryTimer, final KeyMapping keys) { + return createReconnectingClient(remoteAddress, retryTimer, keys, localAddress, false); + } - return ret; + @VisibleForTesting + synchronized Future createReconnectingClient(final InetSocketAddress remoteAddress, + final int retryTimer, final KeyMapping keys, final InetSocketAddress localAddress, + final boolean reuseAddress) { + final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry); + final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress, localAddress); + final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise<>(GlobalEventExecutor.INSTANCE, + remoteAddress, retryTimer, bootstrap, this.bgpPeerRegistry, + BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf)); + reconnectPromise.connect(); + return reconnectPromise; } @Override - public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator) { - return this.createServer(registry, address, sessionValidator, null); + public synchronized ChannelFuture createServer(final InetSocketAddress serverAddress) { + final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(this.bgpPeerRegistry); + final ChannelPipelineInitializer initializer = BGPChannel. + createChannelPipelineInitializer(this.handlerFactory, snf); + final ServerBootstrap serverBootstrap = createServerBootstrap(initializer); + final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress); + LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress); + return channelFuture; } @Override - public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator, final KeyMapping keys) { - final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(sessionValidator, registry); - - this.keys = keys; - final ChannelFuture ret = super.createServer(address, new PipelineInitializer() { - @Override - public void initializeChannel(final SocketChannel ch, final Promise promise) { - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders()); - ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(null, ch, promise)); - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders()); - } - }); - this.keys = null; - - return ret; + public BGPPeerRegistry getBGPPeerRegistry() { + return this.bgpPeerRegistry; } - @Override - protected void customizeBootstrap(final Bootstrap b) { - if (keys != null && !keys.isEmpty()) { - if (cf == null) { - throw new UnsupportedOperationException("No key access instance available, cannot use key mapping"); - } - b.channelFactory(cf); - b.option(MD5ChannelOption.TCP_MD5SIG, keys); + private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) { + final ServerBootstrap serverBootstrap = new ServerBootstrap(); + if (Epoll.isAvailable()) { + serverBootstrap.channel(EpollServerSocketChannel.class); + serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } else { + serverBootstrap.channel(NioServerSocketChannel.class); } - } + final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer); + serverBootstrap.childHandler(serverChannelHandler); - @Override - protected void customizeBootstrap(final ServerBootstrap b) { - if (keys != null && !keys.isEmpty()) { - if (scf == null) { - throw new UnsupportedOperationException("No key access instance available, cannot use key mapping"); - } - b.channelFactory(scf); - b.option(MD5ChannelOption.TCP_MD5SIG, keys); + serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE); + serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK); + + // Make sure we are doing round-robin processing + serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(FIX_BUFFER_SIZE)); + + if (serverBootstrap.config().group() == null) { + serverBootstrap.group(this.bossGroup, this.workerGroup); } + return serverBootstrap; } -} + private static final class BGPChannel { + private static final String NEGOTIATOR = "negotiator"; + + private BGPChannel() { + + } + + static > ChannelPipelineInitializer + createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) { + return (channel, promise) -> { + channel.pipeline().addLast(hf.getDecoders()); + channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise)); + channel.pipeline().addLast(hf.getEncoders()); + }; + } + + static ChannelHandler createClientChannelHandler( + final ChannelPipelineInitializer initializer, final Promise promise) { + return new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel channel) { + initializer.initializeChannel(channel, promise); + } + }; + } + + static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) { + return new ChannelInitializer() { + @Override + @SuppressWarnings("unchecked") + protected void initChannel(final SocketChannel channel) { + initializer.initializeChannel(channel, + new DefaultPromise(GlobalEventExecutor.INSTANCE)); + } + }; + } + } +} \ No newline at end of file