dee8a88b1ab669ca5775a13e601359332a53c0a7
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPDispatcherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.bootstrap.ServerBootstrap;
15 import io.netty.buffer.PooledByteBufAllocator;
16 import io.netty.channel.AdaptiveRecvByteBufAllocator;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelHandler;
19 import io.netty.channel.ChannelInitializer;
20 import io.netty.channel.ChannelOption;
21 import io.netty.channel.EventLoopGroup;
22 import io.netty.channel.RecvByteBufAllocator;
23 import io.netty.channel.WriteBufferWaterMark;
24 import io.netty.channel.epoll.Epoll;
25 import io.netty.channel.epoll.EpollChannelOption;
26 import io.netty.channel.epoll.EpollEventLoopGroup;
27 import io.netty.channel.epoll.EpollMode;
28 import io.netty.channel.epoll.EpollServerSocketChannel;
29 import io.netty.channel.epoll.EpollSocketChannel;
30 import io.netty.channel.socket.SocketChannel;
31 import io.netty.channel.socket.nio.NioServerSocketChannel;
32 import io.netty.channel.socket.nio.NioSocketChannel;
33 import io.netty.util.concurrent.DefaultPromise;
34 import io.netty.util.concurrent.Future;
35 import io.netty.util.concurrent.GlobalEventExecutor;
36 import io.netty.util.concurrent.Promise;
37 import java.net.InetSocketAddress;
38 import java.util.concurrent.TimeUnit;
39 import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry;
40 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
41 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
42 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
43 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
44 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
45 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
46 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
47 import org.opendaylight.protocol.concepts.KeyMapping;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  * Implementation of BGPDispatcher.
53  */
54 public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
55     private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
56     private static final int SOCKET_BACKLOG_SIZE = 128;
57     private static final long TIMEOUT = 10;
58
59     private static final WriteBufferWaterMark WATER_MARK = new WriteBufferWaterMark(128 * 1024, 256 * 1024);
60
61     // An adaptive allocator, so we size our message buffers based on what we receive, but make sure we process one
62     // message at a time. This should be good enough for most cases, although we could optimize it a bit based on
63     // whether we actually negotiate use of large messages -- based on that the range of allocations can be constrained
64     // from the default 64-65536 range to 64-4096.
65     private static final RecvByteBufAllocator RECV_ALLOCATOR = new AdaptiveRecvByteBufAllocator().maxMessagesPerRead(1);
66
67     private final BGPHandlerFactory handlerFactory;
68     private final EventLoopGroup bossGroup;
69     private final EventLoopGroup workerGroup;
70     private final BGPPeerRegistry bgpPeerRegistry;
71
72     public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup,
73             final EventLoopGroup workerGroup, final BGPPeerRegistry bgpPeerRegistry) {
74         if (Epoll.isAvailable()) {
75             this.bossGroup = new EpollEventLoopGroup();
76             this.workerGroup = new EpollEventLoopGroup();
77         } else {
78             this.bossGroup = requireNonNull(bossGroup);
79             this.workerGroup = requireNonNull(workerGroup);
80         }
81         this.bgpPeerRegistry = requireNonNull(bgpPeerRegistry);
82         this.handlerFactory = new BGPHandlerFactory(messageRegistry);
83     }
84
85     @VisibleForTesting
86     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
87             final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
88         final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.getKeyMapping(), reuseAddress, localAddress);
89         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
90         final ChannelPipelineInitializer<BGPSessionImpl> initializer = BGPChannel.createChannelPipelineInitializer(
91                 this.handlerFactory, snf);
92
93         final BGPProtocolSessionPromise<BGPSessionImpl> sessionPromise = new BGPProtocolSessionPromise<>(remoteAddress,
94                 retryTimer, clientBootStrap, this.bgpPeerRegistry);
95         clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
96         sessionPromise.connect();
97         LOG.debug("Client created.");
98         return sessionPromise;
99     }
100
101     private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress,
102             final InetSocketAddress localAddress) {
103         final Bootstrap bootstrap = new Bootstrap();
104         if (Epoll.isAvailable()) {
105             bootstrap.channel(EpollSocketChannel.class);
106             bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
107         } else {
108             bootstrap.channel(NioSocketChannel.class);
109         }
110         if (keys != null && !keys.isEmpty()) {
111             if (Epoll.isAvailable()) {
112                 bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
113             } else {
114                 throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
115             }
116         }
117
118         // Make sure we are doing round-robin processing
119         bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
120         bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
121         bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
122         bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
123
124         if (bootstrap.config().group() == null) {
125             bootstrap.group(this.workerGroup);
126         }
127         bootstrap.localAddress(localAddress);
128
129         return bootstrap;
130     }
131
132     @Override
133     public synchronized void close() {
134         if (Epoll.isAvailable()) {
135             LOG.debug("Closing Dispatcher");
136             this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
137             this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
138         }
139     }
140
141     @Override
142     public synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
143             final InetSocketAddress localAddress, final int retryTimer, final KeyMapping keys) {
144         return createReconnectingClient(remoteAddress, retryTimer, keys, localAddress, false);
145     }
146
147     @VisibleForTesting
148     synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
149             final int retryTimer, final KeyMapping keys, final InetSocketAddress localAddress,
150             final boolean reuseAddress) {
151         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
152         final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress, localAddress);
153         final BGPReconnectPromise<?> reconnectPromise = new BGPReconnectPromise<>(GlobalEventExecutor.INSTANCE,
154                 remoteAddress, retryTimer, bootstrap, this.bgpPeerRegistry,
155                 BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf));
156         reconnectPromise.connect();
157         return reconnectPromise;
158     }
159
160     @Override
161     public synchronized ChannelFuture createServer(final InetSocketAddress serverAddress) {
162         final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(this.bgpPeerRegistry);
163         final ChannelPipelineInitializer<?> initializer = BGPChannel.createChannelPipelineInitializer(
164             this.handlerFactory, snf);
165         final ServerBootstrap serverBootstrap = createServerBootstrap(initializer);
166         final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress);
167         LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress);
168         return channelFuture;
169     }
170
171     @Override
172     public BGPPeerRegistry getBGPPeerRegistry() {
173         return this.bgpPeerRegistry;
174     }
175
176     private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer<?> initializer) {
177         final ServerBootstrap serverBootstrap = new ServerBootstrap();
178         if (Epoll.isAvailable()) {
179             serverBootstrap.channel(EpollServerSocketChannel.class);
180             serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
181         } else {
182             serverBootstrap.channel(NioServerSocketChannel.class);
183         }
184         final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer);
185         serverBootstrap.childHandler(serverChannelHandler);
186
187         serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
188         serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
189         serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
190
191         // Make sure we are doing round-robin processing
192         serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
193
194         if (serverBootstrap.config().group() == null) {
195             serverBootstrap.group(this.bossGroup, this.workerGroup);
196         }
197         return serverBootstrap;
198     }
199
200     private static final class BGPChannel {
201         private static final String NEGOTIATOR = "negotiator";
202
203         private BGPChannel() {
204
205         }
206
207         static <S extends BGPSession, T extends BGPSessionNegotiatorFactory<S>> ChannelPipelineInitializer<S>
208             createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
209             return (channel, promise) -> {
210                 channel.pipeline().addLast(hf.getDecoders());
211                 channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise));
212                 channel.pipeline().addLast(hf.getEncoders());
213             };
214         }
215
216         static <S extends BGPSession> ChannelHandler createClientChannelHandler(
217                 final ChannelPipelineInitializer<S> initializer, final Promise<S> promise) {
218             return new ChannelInitializer<SocketChannel>() {
219                 @Override
220                 protected void initChannel(final SocketChannel channel) {
221                     initializer.initializeChannel(channel, promise);
222                 }
223             };
224         }
225
226         static <S extends BGPSession> ChannelHandler createServerChannelHandler(
227                 final ChannelPipelineInitializer<S> initializer) {
228             return new ChannelInitializer<SocketChannel>() {
229                 @Override
230                 protected void initChannel(final SocketChannel channel) {
231                     initializer.initializeChannel(channel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
232                 }
233             };
234         }
235     }
236 }