2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.bgp.rib.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.bootstrap.ServerBootstrap;
15 import io.netty.buffer.PooledByteBufAllocator;
16 import io.netty.channel.AdaptiveRecvByteBufAllocator;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelHandler;
19 import io.netty.channel.ChannelInitializer;
20 import io.netty.channel.ChannelOption;
21 import io.netty.channel.EventLoopGroup;
22 import io.netty.channel.RecvByteBufAllocator;
23 import io.netty.channel.WriteBufferWaterMark;
24 import io.netty.channel.epoll.Epoll;
25 import io.netty.channel.epoll.EpollChannelOption;
26 import io.netty.channel.epoll.EpollEventLoopGroup;
27 import io.netty.channel.epoll.EpollMode;
28 import io.netty.channel.epoll.EpollServerSocketChannel;
29 import io.netty.channel.epoll.EpollSocketChannel;
30 import io.netty.channel.socket.SocketChannel;
31 import io.netty.channel.socket.nio.NioServerSocketChannel;
32 import io.netty.channel.socket.nio.NioSocketChannel;
33 import io.netty.util.concurrent.DefaultPromise;
34 import io.netty.util.concurrent.Future;
35 import io.netty.util.concurrent.GlobalEventExecutor;
36 import io.netty.util.concurrent.Promise;
37 import java.net.InetSocketAddress;
38 import java.util.concurrent.TimeUnit;
39 import javax.annotation.PreDestroy;
40 import javax.inject.Inject;
41 import javax.inject.Singleton;
42 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionConsumerContext;
43 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
44 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
45 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
46 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
47 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
48 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
49 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
50 import org.opendaylight.protocol.concepts.KeyMapping;
51 import org.osgi.service.component.annotations.Activate;
52 import org.osgi.service.component.annotations.Component;
53 import org.osgi.service.component.annotations.Deactivate;
54 import org.osgi.service.component.annotations.Reference;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
59 * Implementation of BGPDispatcher.
62 @Component(immediate = true, service = BGPDispatcher.class)
63 public final class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
64 private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
65 private static final int SOCKET_BACKLOG_SIZE = 128;
66 private static final long TIMEOUT = 10;
68 private static final WriteBufferWaterMark WATER_MARK = new WriteBufferWaterMark(128 * 1024, 256 * 1024);
70 // An adaptive allocator, so we size our message buffers based on what we receive, but make sure we process one
71 // message at a time. This should be good enough for most cases, although we could optimize it a bit based on
72 // whether we actually negotiate use of large messages -- based on that the range of allocations can be constrained
73 // from the default 64-65536 range to 64-4096.
74 private static final RecvByteBufAllocator RECV_ALLOCATOR = new AdaptiveRecvByteBufAllocator().maxMessagesPerRead(1);
76 private final BGPHandlerFactory handlerFactory;
77 private final EventLoopGroup bossGroup;
78 private final EventLoopGroup workerGroup;
79 private final BGPPeerRegistry bgpPeerRegistry;
83 public BGPDispatcherImpl(@Reference final BGPExtensionConsumerContext extensions,
84 @Reference(target = "(type=global-boss-group)") final EventLoopGroup bossGroup,
85 @Reference(target = "(type=global-worker-group)") final EventLoopGroup workerGroup,
86 @Reference final BGPPeerRegistry bgpPeerRegistry) {
87 if (Epoll.isAvailable()) {
88 this.bossGroup = new EpollEventLoopGroup();
89 this.workerGroup = new EpollEventLoopGroup();
91 this.bossGroup = requireNonNull(bossGroup);
92 this.workerGroup = requireNonNull(workerGroup);
94 this.bgpPeerRegistry = requireNonNull(bgpPeerRegistry);
95 this.handlerFactory = new BGPHandlerFactory(extensions.getMessageRegistry());
99 public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
100 final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
101 final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.getKeyMapping(), reuseAddress, localAddress);
102 final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
103 final ChannelPipelineInitializer<BGPSessionImpl> initializer = BGPChannel.createChannelPipelineInitializer(
104 this.handlerFactory, snf);
106 final BGPProtocolSessionPromise<BGPSessionImpl> sessionPromise = new BGPProtocolSessionPromise<>(remoteAddress,
107 retryTimer, clientBootStrap, this.bgpPeerRegistry);
108 clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
109 sessionPromise.connect();
110 LOG.debug("Client created.");
111 return sessionPromise;
114 private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress,
115 final InetSocketAddress localAddress) {
116 final Bootstrap bootstrap = new Bootstrap();
117 if (Epoll.isAvailable()) {
118 bootstrap.channel(EpollSocketChannel.class);
119 bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
121 bootstrap.channel(NioSocketChannel.class);
123 if (keys != null && !keys.isEmpty()) {
124 if (Epoll.isAvailable()) {
125 bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
127 throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
131 // Make sure we are doing round-robin processing
132 bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
133 bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
134 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
135 bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
137 if (bootstrap.config().group() == null) {
138 bootstrap.group(this.workerGroup);
140 bootstrap.localAddress(localAddress);
148 public synchronized void close() {
149 if (Epoll.isAvailable()) {
150 LOG.debug("Closing Dispatcher");
151 this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
152 this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
157 public synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
158 final InetSocketAddress localAddress, final int retryTimer, final KeyMapping keys) {
159 return createReconnectingClient(remoteAddress, retryTimer, keys, localAddress, false);
163 synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
164 final int retryTimer, final KeyMapping keys, final InetSocketAddress localAddress,
165 final boolean reuseAddress) {
166 final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
167 final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress, localAddress);
168 final BGPReconnectPromise<?> reconnectPromise = new BGPReconnectPromise<>(GlobalEventExecutor.INSTANCE,
169 remoteAddress, retryTimer, bootstrap, this.bgpPeerRegistry,
170 BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf));
171 reconnectPromise.connect();
172 return reconnectPromise;
176 public synchronized ChannelFuture createServer(final InetSocketAddress serverAddress) {
177 final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(this.bgpPeerRegistry);
178 final ChannelPipelineInitializer<?> initializer = BGPChannel.createChannelPipelineInitializer(
179 this.handlerFactory, snf);
180 final ServerBootstrap serverBootstrap = createServerBootstrap(initializer);
181 final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress);
182 LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress);
183 return channelFuture;
187 public BGPPeerRegistry getBGPPeerRegistry() {
188 return this.bgpPeerRegistry;
191 private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer<?> initializer) {
192 final ServerBootstrap serverBootstrap = new ServerBootstrap();
193 if (Epoll.isAvailable()) {
194 serverBootstrap.channel(EpollServerSocketChannel.class);
195 serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
197 serverBootstrap.channel(NioServerSocketChannel.class);
199 final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer);
200 serverBootstrap.childHandler(serverChannelHandler);
202 serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
203 serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
204 serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
206 // Make sure we are doing round-robin processing
207 serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
209 if (serverBootstrap.config().group() == null) {
210 serverBootstrap.group(this.bossGroup, this.workerGroup);
212 return serverBootstrap;
215 private static final class BGPChannel {
216 private static final String NEGOTIATOR = "negotiator";
218 private BGPChannel() {
222 static <S extends BGPSession, T extends BGPSessionNegotiatorFactory<S>> ChannelPipelineInitializer<S>
223 createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
224 return (channel, promise) -> {
225 channel.pipeline().addLast(hf.getDecoders());
226 channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise));
227 channel.pipeline().addLast(hf.getEncoders());
231 static <S extends BGPSession> ChannelHandler createClientChannelHandler(
232 final ChannelPipelineInitializer<S> initializer, final Promise<S> promise) {
233 return new ChannelInitializer<SocketChannel>() {
235 protected void initChannel(final SocketChannel channel) {
236 initializer.initializeChannel(channel, promise);
241 static <S extends BGPSession> ChannelHandler createServerChannelHandler(
242 final ChannelPipelineInitializer<S> initializer) {
243 return new ChannelInitializer<SocketChannel>() {
245 protected void initChannel(final SocketChannel channel) {
246 initializer.initializeChannel(channel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));