Split out BGPDispatcherImpl
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPDispatcherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.bootstrap.ServerBootstrap;
15 import io.netty.buffer.PooledByteBufAllocator;
16 import io.netty.channel.AdaptiveRecvByteBufAllocator;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelHandler;
19 import io.netty.channel.ChannelInitializer;
20 import io.netty.channel.ChannelOption;
21 import io.netty.channel.EventLoopGroup;
22 import io.netty.channel.RecvByteBufAllocator;
23 import io.netty.channel.WriteBufferWaterMark;
24 import io.netty.channel.epoll.Epoll;
25 import io.netty.channel.epoll.EpollChannelOption;
26 import io.netty.channel.epoll.EpollEventLoopGroup;
27 import io.netty.channel.epoll.EpollMode;
28 import io.netty.channel.epoll.EpollServerSocketChannel;
29 import io.netty.channel.epoll.EpollSocketChannel;
30 import io.netty.channel.socket.SocketChannel;
31 import io.netty.channel.socket.nio.NioServerSocketChannel;
32 import io.netty.channel.socket.nio.NioSocketChannel;
33 import io.netty.util.concurrent.DefaultPromise;
34 import io.netty.util.concurrent.Future;
35 import io.netty.util.concurrent.GlobalEventExecutor;
36 import io.netty.util.concurrent.Promise;
37 import java.net.InetSocketAddress;
38 import java.util.concurrent.TimeUnit;
39 import javax.annotation.PreDestroy;
40 import javax.inject.Inject;
41 import javax.inject.Singleton;
42 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionConsumerContext;
43 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
44 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
45 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
46 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
47 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
48 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
49 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
50 import org.opendaylight.protocol.concepts.KeyMapping;
51 import org.osgi.service.component.annotations.Activate;
52 import org.osgi.service.component.annotations.Component;
53 import org.osgi.service.component.annotations.Deactivate;
54 import org.osgi.service.component.annotations.Reference;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * Implementation of BGPDispatcher.
60  */
61 @Singleton
62 @Component(immediate = true, service = BGPDispatcher.class)
63 public final class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
64     private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
65     private static final int SOCKET_BACKLOG_SIZE = 128;
66     private static final long TIMEOUT = 10;
67
68     private static final WriteBufferWaterMark WATER_MARK = new WriteBufferWaterMark(128 * 1024, 256 * 1024);
69
70     // An adaptive allocator, so we size our message buffers based on what we receive, but make sure we process one
71     // message at a time. This should be good enough for most cases, although we could optimize it a bit based on
72     // whether we actually negotiate use of large messages -- based on that the range of allocations can be constrained
73     // from the default 64-65536 range to 64-4096.
74     private static final RecvByteBufAllocator RECV_ALLOCATOR = new AdaptiveRecvByteBufAllocator().maxMessagesPerRead(1);
75
76     private final BGPHandlerFactory handlerFactory;
77     private final EventLoopGroup bossGroup;
78     private final EventLoopGroup workerGroup;
79     private final BGPPeerRegistry bgpPeerRegistry;
80
81     @Inject
82     @Activate
83     public BGPDispatcherImpl(@Reference final BGPExtensionConsumerContext extensions,
84             @Reference(target = "(type=global-boss-group)") final EventLoopGroup bossGroup,
85             @Reference(target = "(type=global-worker-group)") final EventLoopGroup workerGroup,
86             @Reference final BGPPeerRegistry bgpPeerRegistry) {
87         if (Epoll.isAvailable()) {
88             this.bossGroup = new EpollEventLoopGroup();
89             this.workerGroup = new EpollEventLoopGroup();
90         } else {
91             this.bossGroup = requireNonNull(bossGroup);
92             this.workerGroup = requireNonNull(workerGroup);
93         }
94         this.bgpPeerRegistry = requireNonNull(bgpPeerRegistry);
95         this.handlerFactory = new BGPHandlerFactory(extensions.getMessageRegistry());
96     }
97
98     @VisibleForTesting
99     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
100             final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
101         final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.getKeyMapping(), reuseAddress, localAddress);
102         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
103         final ChannelPipelineInitializer<BGPSessionImpl> initializer = BGPChannel.createChannelPipelineInitializer(
104                 this.handlerFactory, snf);
105
106         final BGPProtocolSessionPromise<BGPSessionImpl> sessionPromise = new BGPProtocolSessionPromise<>(remoteAddress,
107                 retryTimer, clientBootStrap, this.bgpPeerRegistry);
108         clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
109         sessionPromise.connect();
110         LOG.debug("Client created.");
111         return sessionPromise;
112     }
113
114     private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress,
115             final InetSocketAddress localAddress) {
116         final Bootstrap bootstrap = new Bootstrap();
117         if (Epoll.isAvailable()) {
118             bootstrap.channel(EpollSocketChannel.class);
119             bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
120         } else {
121             bootstrap.channel(NioSocketChannel.class);
122         }
123         if (keys != null && !keys.isEmpty()) {
124             if (Epoll.isAvailable()) {
125                 bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
126             } else {
127                 throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
128             }
129         }
130
131         // Make sure we are doing round-robin processing
132         bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
133         bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
134         bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
135         bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
136
137         if (bootstrap.config().group() == null) {
138             bootstrap.group(this.workerGroup);
139         }
140         bootstrap.localAddress(localAddress);
141
142         return bootstrap;
143     }
144
145     @Deactivate
146     @PreDestroy
147     @Override
148     public synchronized void close() {
149         if (Epoll.isAvailable()) {
150             LOG.debug("Closing Dispatcher");
151             this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
152             this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
153         }
154     }
155
156     @Override
157     public synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
158             final InetSocketAddress localAddress, final int retryTimer, final KeyMapping keys) {
159         return createReconnectingClient(remoteAddress, retryTimer, keys, localAddress, false);
160     }
161
162     @VisibleForTesting
163     synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
164             final int retryTimer, final KeyMapping keys, final InetSocketAddress localAddress,
165             final boolean reuseAddress) {
166         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
167         final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress, localAddress);
168         final BGPReconnectPromise<?> reconnectPromise = new BGPReconnectPromise<>(GlobalEventExecutor.INSTANCE,
169                 remoteAddress, retryTimer, bootstrap, this.bgpPeerRegistry,
170                 BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf));
171         reconnectPromise.connect();
172         return reconnectPromise;
173     }
174
175     @Override
176     public synchronized ChannelFuture createServer(final InetSocketAddress serverAddress) {
177         final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(this.bgpPeerRegistry);
178         final ChannelPipelineInitializer<?> initializer = BGPChannel.createChannelPipelineInitializer(
179             this.handlerFactory, snf);
180         final ServerBootstrap serverBootstrap = createServerBootstrap(initializer);
181         final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress);
182         LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress);
183         return channelFuture;
184     }
185
186     @Override
187     public BGPPeerRegistry getBGPPeerRegistry() {
188         return this.bgpPeerRegistry;
189     }
190
191     private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer<?> initializer) {
192         final ServerBootstrap serverBootstrap = new ServerBootstrap();
193         if (Epoll.isAvailable()) {
194             serverBootstrap.channel(EpollServerSocketChannel.class);
195             serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
196         } else {
197             serverBootstrap.channel(NioServerSocketChannel.class);
198         }
199         final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer);
200         serverBootstrap.childHandler(serverChannelHandler);
201
202         serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
203         serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
204         serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
205
206         // Make sure we are doing round-robin processing
207         serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
208
209         if (serverBootstrap.config().group() == null) {
210             serverBootstrap.group(this.bossGroup, this.workerGroup);
211         }
212         return serverBootstrap;
213     }
214
215     private static final class BGPChannel {
216         private static final String NEGOTIATOR = "negotiator";
217
218         private BGPChannel() {
219
220         }
221
222         static <S extends BGPSession, T extends BGPSessionNegotiatorFactory<S>> ChannelPipelineInitializer<S>
223             createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
224             return (channel, promise) -> {
225                 channel.pipeline().addLast(hf.getDecoders());
226                 channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise));
227                 channel.pipeline().addLast(hf.getEncoders());
228             };
229         }
230
231         static <S extends BGPSession> ChannelHandler createClientChannelHandler(
232                 final ChannelPipelineInitializer<S> initializer, final Promise<S> promise) {
233             return new ChannelInitializer<SocketChannel>() {
234                 @Override
235                 protected void initChannel(final SocketChannel channel) {
236                     initializer.initializeChannel(channel, promise);
237                 }
238             };
239         }
240
241         static <S extends BGPSession> ChannelHandler createServerChannelHandler(
242                 final ChannelPipelineInitializer<S> initializer) {
243             return new ChannelInitializer<SocketChannel>() {
244                 @Override
245                 protected void initChannel(final SocketChannel channel) {
246                     initializer.initializeChannel(channel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
247                 }
248             };
249         }
250     }
251 }