X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=bgp%2Frib-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fbgp%2Frib%2Fimpl%2FBGPDispatcherImpl.java;h=baa2073ead0d5d347043cb4a4ccd436288d46f75;hb=9374c72792fabebf491261a4e07a4113fb643510;hp=54d75e0bcbd1b7983d87722b815efb0e2f433bae;hpb=78837d66c9d59783e6519cd37242275475c86600;p=bgpcep.git diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java index 54d75e0bcb..baa2073ead 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java @@ -7,52 +7,211 @@ */ package org.opendaylight.protocol.bgp.rib.impl; +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timer; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; -import org.opendaylight.protocol.bgp.parser.BGPMessageFactory; -import org.opendaylight.protocol.bgp.parser.BGPSessionListener; +import java.net.InetSocketAddress; +import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry; +import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise; +import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher; -import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences; -import org.opendaylight.protocol.framework.AbstractDispatcher; +import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry; +import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator; +import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory; import org.opendaylight.protocol.framework.ReconnectStrategy; -import org.opendaylight.protocol.framework.SessionListenerFactory; - -import java.net.InetSocketAddress; +import org.opendaylight.protocol.framework.ReconnectStrategyFactory; +import org.opendaylight.tcpmd5.api.KeyMapping; +import org.opendaylight.tcpmd5.netty.MD5ChannelFactory; +import org.opendaylight.tcpmd5.netty.MD5ChannelOption; +import org.opendaylight.tcpmd5.netty.MD5ServerChannelFactory; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of BGPDispatcher. */ -public final class BGPDispatcherImpl extends AbstractDispatcher implements BGPDispatcher { - private final Timer timer = new HashedWheelTimer(); - - private final BGPHandlerFactory hf; - - public BGPDispatcherImpl(final BGPMessageFactory parser, EventLoopGroup bossGroup, EventLoopGroup workerGroup) { - super(bossGroup, workerGroup); - this.hf = new BGPHandlerFactory(parser); - } - - @Override - public Future createClient(final InetSocketAddress address, final BGPSessionPreferences preferences, - final BGPSessionListener listener, final ReconnectStrategy strategy) { - final BGPSessionNegotiatorFactory snf = new BGPSessionNegotiatorFactory(this.timer, preferences); - final SessionListenerFactory slf = new SessionListenerFactory() { - @Override - public BGPSessionListener getSessionListener() { - return listener; - } - }; - return super.createClient(address, strategy, new PipelineInitializer() { - @Override - public void initializeChannel(final SocketChannel ch, final Promise promise) { - ch.pipeline().addLast(hf.getDecoders()); - ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(slf, ch, promise)); - ch.pipeline().addLast(hf.getEncoders()); - } - }); - } +public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class); + private final MD5ServerChannelFactory scf; + private final MD5ChannelFactory cf; + private final BGPHandlerFactory hf; + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + private final EventExecutor executor; + private KeyMapping keys; + + public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) { + this(messageRegistry, bossGroup, workerGroup, null, null); + } + + public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MD5ChannelFactory cf, final MD5ServerChannelFactory scf) { + this.bossGroup = Preconditions.checkNotNull(bossGroup); + this.workerGroup = Preconditions.checkNotNull(workerGroup); + this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE); + this.hf = new BGPHandlerFactory(messageRegistry); + this.cf = cf; + this.scf = scf; + this.keys = null; + } + + @Override + public synchronized Future createClient(final InetSocketAddress address, + final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy) { + final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, listener); + final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer + (BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders()); + + final Bootstrap b = new Bootstrap(); + final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, b); + b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true)); + b.handler(BGPChannel.createChannelInitializer(initializer, p)); + this.customizeBootstrap(b); + this.setWorkerGroup(b); + p.connect(); + LOG.debug("Client created."); + return p; + } + + @Override + public void close() { + try { + this.workerGroup.shutdownGracefully(); + } finally { + this.bossGroup.shutdownGracefully(); + } + } + + @Override + public synchronized Future createReconnectingClient(final InetSocketAddress address, + final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory connectStrategyFactory, + final KeyMapping keys) { + final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, peerRegistry); + this.keys = keys; + + final Bootstrap b = new Bootstrap(); + final BGPReconnectPromise p = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, address, + connectStrategyFactory, b, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders())); + b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true)); + this.customizeBootstrap(b); + this.setWorkerGroup(b); + p.connect(); + + this.keys = null; + + return p; + } + + @Override + public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator) { + final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(sessionValidator, registry); + final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer + (BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders()); + final ServerBootstrap b = new ServerBootstrap(); + b.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor))); + b.option(ChannelOption.SO_BACKLOG, Integer.valueOf(128)); + b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + this.customizeBootstrap(b); + + final ChannelFuture f = b.bind(address); + LOG.debug("Initiated server {} at {}.", f, address); + return f; + } + + protected void customizeBootstrap(final Bootstrap b) { + if (this.keys != null && !this.keys.isEmpty()) { + if (this.cf == null) { + throw new UnsupportedOperationException("No key access instance available, cannot use key mapping"); + } + b.channelFactory(this.cf); + b.option(MD5ChannelOption.TCP_MD5SIG, this.keys); + } + + // Make sure we are doing round-robin processing + b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1); + } + + private void customizeBootstrap(final ServerBootstrap b) { + if (this.keys != null && !this.keys.isEmpty()) { + if (this.scf == null) { + throw new UnsupportedOperationException("No key access instance available, cannot use key mapping"); + } + b.channelFactory(this.scf); + b.option(MD5ChannelOption.TCP_MD5SIG, this.keys); + } + + // Make sure we are doing round-robin processing + b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1); + + if (b.group() == null) { + b.group(this.bossGroup, this.workerGroup); + } + + try { + b.channel(NioServerSocketChannel.class); + } catch (IllegalStateException e) { + LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); + } + } + + private void setWorkerGroup(final Bootstrap b) { + if (b.group() == null) { + b.group(this.workerGroup); + } + try { + b.channel(NioSocketChannel.class); + } catch (IllegalStateException e) { + LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); + } + } + + public interface ChannelPipelineInitializer { + void initializeChannel(SocketChannel socketChannel, Promise promise); + } + + public static class BGPChannel { + private static final String NEGOTIATOR = "negotiator"; + + private BGPChannel() { + + } + + public static ChannelPipelineInitializer createChannelPipelineInitializer(final ChannelHandler[] channelDecoder, + final T snf, + final ChannelHandler[] channelEncoder) { + return new ChannelPipelineInitializer() { + @Override + public void initializeChannel(final SocketChannel ch, final Promise promise) { + ch.pipeline().addLast(channelDecoder); + ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(ch, promise)); + ch.pipeline().addLast(channelEncoder); + } + }; + } + + public static ChannelHandler createChannelInitializer(final ChannelPipelineInitializer initializer, final Promise promise) { + return new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + initializer.initializeChannel(ch, promise); + } + }; + } + } } + +