X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=bgp%2Frib-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fbgp%2Frib%2Fimpl%2FBGPDispatcherImpl.java;h=5160d566b496ac3836ba4a0951f5f06ecd5a118f;hb=refs%2Fheads%2Fstable%2Fboron;hp=67f845dc9975275ab3e5091b436eba97a01c6c35;hpb=662a4449d1e5ff3d87859ea997d0ecaef7cfaac3;p=bgpcep.git diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java index 67f845dc99..5160d566b4 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java @@ -7,145 +7,218 @@ */ package org.opendaylight.protocol.bgp.rib.impl; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollChannelOption; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollMode; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.util.Timer; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; -import org.opendaylight.bgpcep.tcpmd5.KeyMapping; -import org.opendaylight.bgpcep.tcpmd5.netty.MD5ChannelFactory; -import org.opendaylight.bgpcep.tcpmd5.netty.MD5ChannelOption; -import org.opendaylight.bgpcep.tcpmd5.netty.MD5ServerChannelFactory; -import org.opendaylight.protocol.bgp.parser.BGPSessionListener; +import java.util.concurrent.TimeUnit; import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry; +import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise; +import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry; -import org.opendaylight.protocol.bgp.rib.impl.spi.BGPServerDispatcher; -import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator; -import org.opendaylight.protocol.framework.AbstractDispatcher; -import org.opendaylight.protocol.framework.ReconnectStrategy; -import org.opendaylight.protocol.framework.ReconnectStrategyFactory; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber; +import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer; +import org.opendaylight.protocol.bgp.rib.spi.BGPSession; +import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory; +import org.opendaylight.protocol.concepts.KeyMapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of BGPDispatcher. */ -public final class BGPDispatcherImpl extends AbstractDispatcher implements BGPDispatcher, BGPServerDispatcher, AutoCloseable { - private final MD5ServerChannelFactory scf; - private final MD5ChannelFactory cf; - private final BGPHandlerFactory hf; - private final Timer timer; - private KeyMapping keys; - - public BGPDispatcherImpl(final MessageRegistry messageRegistry, final Timer timer, final EventLoopGroup bossGroup, - final EventLoopGroup workerGroup) { - this(messageRegistry, timer, bossGroup, workerGroup, null, null); +public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class); + private static final int SOCKET_BACKLOG_SIZE = 128; + private static final int HIGH_WATER_MARK = 256 * 1024; + private static final int LOW_WATER_MARK = 128 * 1024; + private static final long TIMEOUT = 10; + + private final BGPHandlerFactory handlerFactory; + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + + public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) { + if (Epoll.isAvailable()) { + this.bossGroup = new EpollEventLoopGroup(); + this.workerGroup = new EpollEventLoopGroup(); + } else { + this.bossGroup = Preconditions.checkNotNull(bossGroup); + this.workerGroup = Preconditions.checkNotNull(workerGroup); + } + this.handlerFactory = new BGPHandlerFactory(messageRegistry); } - public BGPDispatcherImpl(final MessageRegistry messageRegistry, final Timer timer, final EventLoopGroup bossGroup, - final EventLoopGroup workerGroup, final MD5ChannelFactory cf, final MD5ServerChannelFactory scf) { - super(bossGroup, workerGroup); - this.timer = Preconditions.checkNotNull(timer); - this.hf = new BGPHandlerFactory(messageRegistry); - this.cf = cf; - this.scf = scf; + @Override + public synchronized Future createClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry listener, final int retryTimer) { + return createClient(remoteAddress, listener, retryTimer, createClientBootStrap(Optional.absent(), false)); } - @Override - public synchronized Future createClient(final InetSocketAddress address, - final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy) { - final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.timer, remoteAs, listener); - return super.createClient(address, strategy, new PipelineInitializer() { - @Override - public void initializeChannel(final SocketChannel ch, final Promise promise) { - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders()); - ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(null, ch, promise)); - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders()); + private synchronized Future createClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry listener, final int retryTimer, + final Bootstrap clientBootStrap) { + final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(listener); + final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf); + + final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(remoteAddress, retryTimer, clientBootStrap, listener); + clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise)); + sessionPromise.connect(); + LOG.debug("Client created."); + return sessionPromise; + } + + @VisibleForTesting + public synchronized Future createClient(final InetSocketAddress localAddress, final InetSocketAddress remoteAddress, + final BGPPeerRegistry strictBGPPeerRegistry, final int retryTimer, final boolean reuseAddress) { + final Bootstrap clientBootStrap = createClientBootStrap(Optional.absent(), reuseAddress); + clientBootStrap.localAddress(localAddress); + return createClient(remoteAddress, strictBGPPeerRegistry, retryTimer, clientBootStrap); + } + + private synchronized Bootstrap createClientBootStrap(final Optional keys, final boolean reuseAddress) { + final Bootstrap bootstrap = new Bootstrap(); + if (Epoll.isAvailable()) { + bootstrap.channel(EpollSocketChannel.class); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } else { + bootstrap.channel(NioSocketChannel.class); + } + if (keys.isPresent()) { + if (Epoll.isAvailable()) { + bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.get()); + } else { + throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause()); } - }); + } + + // Make sure we are doing round-robin processing + bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1); + bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); + bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK); + bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK); + bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); + + if (bootstrap.group() == null) { + bootstrap.group(this.workerGroup); + } + + return bootstrap; } @Override - public Future createReconnectingClient(final InetSocketAddress address, - final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategyFactory connectStrategyFactory, - final ReconnectStrategyFactory reestablishStrategyFactory) { - return this.createReconnectingClient(address, remoteAs, listener, connectStrategyFactory, reestablishStrategyFactory, - null); + public synchronized void close() throws InterruptedException { + if (Epoll.isAvailable()) { + LOG.debug("Closing Dispatcher"); + this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); + this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS); + } } @Override - public void close() { + public synchronized Future createReconnectingClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry peerRegistry, + final int retryTimer, final Optional keys) { + return createReconnectingClient(remoteAddress, peerRegistry, retryTimer, keys, null, false); } - @Override - public synchronized Future createReconnectingClient(final InetSocketAddress address, - final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory connectStrategyFactory, - final ReconnectStrategyFactory reestablishStrategyFactory, final KeyMapping keys) { - final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.timer, remoteAs, peerRegistry); - - this.keys = keys; - final Future ret = super.createReconnectingClient(address, connectStrategyFactory, - reestablishStrategyFactory.createReconnectStrategy(), new PipelineInitializer() { - @Override - public void initializeChannel(final SocketChannel ch, final Promise promise) { - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders()); - ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(null, ch, promise)); - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders()); - } - }); - this.keys = null; - - return ret; + @VisibleForTesting + protected synchronized Future createReconnectingClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry peerRegistry, + final int retryTimer, final Optional keys, final InetSocketAddress localAddress, final boolean reuseAddress) { + final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(peerRegistry); + final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress); + bootstrap.localAddress(localAddress); + final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, remoteAddress, + retryTimer, bootstrap, peerRegistry, BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf)); + reconnectPromise.connect(); + return reconnectPromise; } @Override - public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator) { - return this.createServer(registry, address, sessionValidator, null); + public synchronized ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress serverAddress) { + final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(registry); + final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf); + final ServerBootstrap serverBootstrap = createServerBootstrap(initializer); + final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress); + LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress); + return channelFuture; } - @Override - public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator, final KeyMapping keys) { - final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(this.timer, sessionValidator, registry); - - this.keys = keys; - final ChannelFuture ret = super.createServer(address, new PipelineInitializer() { - @Override - public void initializeChannel(final SocketChannel ch, final Promise promise) { - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders()); - ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(null, ch, promise)); - ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders()); - } - }); - this.keys = null; + private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) { + final ServerBootstrap serverBootstrap = new ServerBootstrap(); + if (Epoll.isAvailable()) { + serverBootstrap.channel(EpollServerSocketChannel.class); + serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } else { + serverBootstrap.channel(NioServerSocketChannel.class); + } + final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer); + serverBootstrap.childHandler(serverChannelHandler); - return ret; - } + serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE); + serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK); + serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK); - @Override - protected void customizeBootstrap(final Bootstrap b) { - if (keys != null && !keys.isEmpty()) { - if (cf == null) { - throw new UnsupportedOperationException("No key access instance available, cannot use key mapping"); - } - b.channelFactory(cf); - b.option(MD5ChannelOption.TCP_MD5SIG, keys); + // Make sure we are doing round-robin processing + serverBootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1); + + if (serverBootstrap.group() == null) { + serverBootstrap.group(this.bossGroup, this.workerGroup); } + return serverBootstrap; } - @Override - protected void customizeBootstrap(final ServerBootstrap b) { - if (keys != null && !keys.isEmpty()) { - if (scf == null) { - throw new UnsupportedOperationException("No key access instance available, cannot use key mapping"); - } - b.channelFactory(scf); - b.option(MD5ChannelOption.TCP_MD5SIG, keys); + private static final class BGPChannel { + private static final String NEGOTIATOR = "negotiator"; + + private BGPChannel() { + + } + + static ChannelPipelineInitializer + createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) { + return (channel, promise) -> { + channel.pipeline().addLast(hf.getDecoders()); + channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise)); + channel.pipeline().addLast(hf.getEncoders()); + }; } - } -} + static ChannelHandler createClientChannelHandler(final ChannelPipelineInitializer initializer, final Promise promise) { + return new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel channel) { + initializer.initializeChannel(channel, promise); + } + }; + } + + static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) { + return new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel channel) { + initializer.initializeChannel(channel, new DefaultPromise(GlobalEventExecutor.INSTANCE)); + } + }; + } + } +} \ No newline at end of file