X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=bgp%2Frib-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fbgp%2Frib%2Fimpl%2FBGPDispatcherImpl.java;h=5160d566b496ac3836ba4a0951f5f06ecd5a118f;hb=refs%2Fheads%2Fstable%2Fboron;hp=54d75e0bcbd1b7983d87722b815efb0e2f433bae;hpb=365d6585c96ae16d4e86cb035bc183c7207cc53c;p=bgpcep.git diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java index 54d75e0bcb..5160d566b4 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java @@ -7,52 +7,218 @@ */ package org.opendaylight.protocol.bgp.rib.impl; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollChannelOption; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollMode; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timer; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; -import org.opendaylight.protocol.bgp.parser.BGPMessageFactory; -import org.opendaylight.protocol.bgp.parser.BGPSessionListener; -import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher; -import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences; -import org.opendaylight.protocol.framework.AbstractDispatcher; -import org.opendaylight.protocol.framework.ReconnectStrategy; -import org.opendaylight.protocol.framework.SessionListenerFactory; - import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry; +import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise; +import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise; +import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher; +import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry; +import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer; +import org.opendaylight.protocol.bgp.rib.spi.BGPSession; +import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory; +import org.opendaylight.protocol.concepts.KeyMapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of BGPDispatcher. */ -public final class BGPDispatcherImpl extends AbstractDispatcher implements BGPDispatcher { - private final Timer timer = new HashedWheelTimer(); - - private final BGPHandlerFactory hf; - - public BGPDispatcherImpl(final BGPMessageFactory parser, EventLoopGroup bossGroup, EventLoopGroup workerGroup) { - super(bossGroup, workerGroup); - this.hf = new BGPHandlerFactory(parser); - } - - @Override - public Future createClient(final InetSocketAddress address, final BGPSessionPreferences preferences, - final BGPSessionListener listener, final ReconnectStrategy strategy) { - final BGPSessionNegotiatorFactory snf = new BGPSessionNegotiatorFactory(this.timer, preferences); - final SessionListenerFactory slf = new SessionListenerFactory() { - @Override - public BGPSessionListener getSessionListener() { - return listener; - } - }; - return super.createClient(address, strategy, new PipelineInitializer() { - @Override - public void initializeChannel(final SocketChannel ch, final Promise promise) { - ch.pipeline().addLast(hf.getDecoders()); - ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(slf, ch, promise)); - ch.pipeline().addLast(hf.getEncoders()); - } - }); - } -} +public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class); + private static final int SOCKET_BACKLOG_SIZE = 128; + private static final int HIGH_WATER_MARK = 256 * 1024; + private static final int LOW_WATER_MARK = 128 * 1024; + private static final long TIMEOUT = 10; + + private final BGPHandlerFactory handlerFactory; + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + + public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) { + if (Epoll.isAvailable()) { + this.bossGroup = new EpollEventLoopGroup(); + this.workerGroup = new EpollEventLoopGroup(); + } else { + this.bossGroup = Preconditions.checkNotNull(bossGroup); + this.workerGroup = Preconditions.checkNotNull(workerGroup); + } + this.handlerFactory = new BGPHandlerFactory(messageRegistry); + } + + @Override + public synchronized Future createClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry listener, final int retryTimer) { + return createClient(remoteAddress, listener, retryTimer, createClientBootStrap(Optional.absent(), false)); + } + + private synchronized Future createClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry listener, final int retryTimer, + final Bootstrap clientBootStrap) { + final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(listener); + final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf); + + final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(remoteAddress, retryTimer, clientBootStrap, listener); + clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise)); + sessionPromise.connect(); + LOG.debug("Client created."); + return sessionPromise; + } + + @VisibleForTesting + public synchronized Future createClient(final InetSocketAddress localAddress, final InetSocketAddress remoteAddress, + final BGPPeerRegistry strictBGPPeerRegistry, final int retryTimer, final boolean reuseAddress) { + final Bootstrap clientBootStrap = createClientBootStrap(Optional.absent(), reuseAddress); + clientBootStrap.localAddress(localAddress); + return createClient(remoteAddress, strictBGPPeerRegistry, retryTimer, clientBootStrap); + } + + private synchronized Bootstrap createClientBootStrap(final Optional keys, final boolean reuseAddress) { + final Bootstrap bootstrap = new Bootstrap(); + if (Epoll.isAvailable()) { + bootstrap.channel(EpollSocketChannel.class); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } else { + bootstrap.channel(NioSocketChannel.class); + } + if (keys.isPresent()) { + if (Epoll.isAvailable()) { + bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.get()); + } else { + throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause()); + } + } + + // Make sure we are doing round-robin processing + bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1); + bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); + bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK); + bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK); + bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); + + if (bootstrap.group() == null) { + bootstrap.group(this.workerGroup); + } + + return bootstrap; + } + + @Override + public synchronized void close() throws InterruptedException { + if (Epoll.isAvailable()) { + LOG.debug("Closing Dispatcher"); + this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); + this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); + } + } + + @Override + public synchronized Future createReconnectingClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry peerRegistry, + final int retryTimer, final Optional keys) { + return createReconnectingClient(remoteAddress, peerRegistry, retryTimer, keys, null, false); + } + + @VisibleForTesting + protected synchronized Future createReconnectingClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry peerRegistry, + final int retryTimer, final Optional keys, final InetSocketAddress localAddress, final boolean reuseAddress) { + final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(peerRegistry); + final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress); + bootstrap.localAddress(localAddress); + final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, remoteAddress, + retryTimer, bootstrap, peerRegistry, BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf)); + reconnectPromise.connect(); + return reconnectPromise; + } + + @Override + public synchronized ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress serverAddress) { + final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(registry); + final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf); + final ServerBootstrap serverBootstrap = createServerBootstrap(initializer); + final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress); + LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress); + return channelFuture; + } + + private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) { + final ServerBootstrap serverBootstrap = new ServerBootstrap(); + if (Epoll.isAvailable()) { + serverBootstrap.channel(EpollServerSocketChannel.class); + serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } else { + serverBootstrap.channel(NioServerSocketChannel.class); + } + final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer); + serverBootstrap.childHandler(serverChannelHandler); + + serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE); + serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK); + serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK); + + // Make sure we are doing round-robin processing + serverBootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1); + + if (serverBootstrap.group() == null) { + serverBootstrap.group(this.bossGroup, this.workerGroup); + } + return serverBootstrap; + } + + private static final class BGPChannel { + private static final String NEGOTIATOR = "negotiator"; + + private BGPChannel() { + + } + + static ChannelPipelineInitializer + createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) { + return (channel, promise) -> { + channel.pipeline().addLast(hf.getDecoders()); + channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise)); + channel.pipeline().addLast(hf.getEncoders()); + }; + } + + static ChannelHandler createClientChannelHandler(final ChannelPipelineInitializer initializer, final Promise promise) { + return new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel channel) { + initializer.initializeChannel(channel, promise); + } + }; + } + + static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) { + return new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel channel) { + initializer.initializeChannel(channel, new DefaultPromise(GlobalEventExecutor.INSTANCE)); + } + }; + } + } +} \ No newline at end of file