Replace Preconditions.CheckNotNull per RequireNonNull
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPDispatcherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.bootstrap.ServerBootstrap;
15 import io.netty.buffer.PooledByteBufAllocator;
16 import io.netty.channel.ChannelFuture;
17 import io.netty.channel.ChannelHandler;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.WriteBufferWaterMark;
22 import io.netty.channel.epoll.Epoll;
23 import io.netty.channel.epoll.EpollChannelOption;
24 import io.netty.channel.epoll.EpollEventLoopGroup;
25 import io.netty.channel.epoll.EpollMode;
26 import io.netty.channel.epoll.EpollServerSocketChannel;
27 import io.netty.channel.epoll.EpollSocketChannel;
28 import io.netty.channel.socket.SocketChannel;
29 import io.netty.channel.socket.nio.NioServerSocketChannel;
30 import io.netty.channel.socket.nio.NioSocketChannel;
31 import io.netty.util.concurrent.DefaultPromise;
32 import io.netty.util.concurrent.Future;
33 import io.netty.util.concurrent.GlobalEventExecutor;
34 import io.netty.util.concurrent.Promise;
35 import java.net.InetSocketAddress;
36 import java.util.concurrent.TimeUnit;
37 import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry;
38 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
39 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
40 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
41 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
42 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
43 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
44 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
45 import org.opendaylight.protocol.concepts.KeyMapping;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Implementation of BGPDispatcher.
51  */
52 public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
53     private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
54     private static final int SOCKET_BACKLOG_SIZE = 128;
55     private static final long TIMEOUT = 10;
56
57     private static final WriteBufferWaterMark WATER_MARK = new WriteBufferWaterMark(128 * 1024, 256 * 1024);
58
59     private final BGPHandlerFactory handlerFactory;
60     private final EventLoopGroup bossGroup;
61     private final EventLoopGroup workerGroup;
62     private final BGPPeerRegistry bgpPeerRegistry;
63
64     public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup,
65         final EventLoopGroup workerGroup, final BGPPeerRegistry bgpPeerRegistry) {
66         if (Epoll.isAvailable()) {
67             this.bossGroup = new EpollEventLoopGroup();
68             this.workerGroup = new EpollEventLoopGroup();
69         } else {
70             this.bossGroup = requireNonNull(bossGroup);
71             this.workerGroup = requireNonNull(workerGroup);
72         }
73         this.bgpPeerRegistry = requireNonNull(bgpPeerRegistry);
74         this.handlerFactory = new BGPHandlerFactory(messageRegistry);
75     }
76
77     @Override
78     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress, final int retryTimer) {
79         return createClient(remoteAddress, retryTimer, createClientBootStrap(KeyMapping.getKeyMapping(), false));
80     }
81
82     private synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress,
83         final int retryTimer, final Bootstrap clientBootStrap) {
84         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
85         final ChannelPipelineInitializer<BGPSessionImpl> initializer = BGPChannel.createChannelPipelineInitializer(
86             this.handlerFactory, snf);
87
88         final BGPProtocolSessionPromise<BGPSessionImpl> sessionPromise = new BGPProtocolSessionPromise<>(remoteAddress,
89                 retryTimer, clientBootStrap, this.bgpPeerRegistry);
90         clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
91         sessionPromise.connect();
92         LOG.debug("Client created.");
93         return sessionPromise;
94     }
95
96     @VisibleForTesting
97     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
98         final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
99         final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.getKeyMapping(), reuseAddress);
100         clientBootStrap.localAddress(localAddress);
101         return createClient(remoteAddress, retryTimer, clientBootStrap);
102     }
103
104     private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress) {
105         final Bootstrap bootstrap = new Bootstrap();
106         if (Epoll.isAvailable()) {
107             bootstrap.channel(EpollSocketChannel.class);
108             bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
109         } else {
110             bootstrap.channel(NioSocketChannel.class);
111         }
112         if (!keys.isEmpty()) {
113             if (Epoll.isAvailable()) {
114                 bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
115             } else {
116                 throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
117             }
118         }
119
120         // Make sure we are doing round-robin processing
121         bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
122         bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
123         bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
124         bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
125
126         if (bootstrap.group() == null) {
127             bootstrap.group(this.workerGroup);
128         }
129
130         return bootstrap;
131     }
132
133     @Override
134     public synchronized void close() throws InterruptedException {
135         if (Epoll.isAvailable()) {
136             LOG.debug("Closing Dispatcher");
137             this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
138             this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
139         }
140     }
141
142     @Override
143     public synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
144             final int retryTimer, final KeyMapping keys) {
145         return createReconnectingClient(remoteAddress, retryTimer, keys, null, false);
146     }
147
148     @VisibleForTesting
149     protected synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
150         final int retryTimer, final KeyMapping keys, final InetSocketAddress localAddress,
151         final boolean reuseAddress) {
152         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
153         final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress);
154         bootstrap.localAddress(localAddress);
155         final BGPReconnectPromise<?> reconnectPromise = new BGPReconnectPromise<>(GlobalEventExecutor.INSTANCE,
156             remoteAddress, retryTimer, bootstrap, this.bgpPeerRegistry,
157             BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf));
158         reconnectPromise.connect();
159         return reconnectPromise;
160     }
161
162     @Override
163     public synchronized ChannelFuture createServer(final InetSocketAddress serverAddress) {
164         final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(this.bgpPeerRegistry);
165         final ChannelPipelineInitializer<?> initializer = BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf);
166         final ServerBootstrap serverBootstrap = createServerBootstrap(initializer);
167         final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress);
168         LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress);
169         return channelFuture;
170     }
171
172     @Override
173     public BGPPeerRegistry getBGPPeerRegistry() {
174         return this.bgpPeerRegistry;
175     }
176
177     private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
178         final ServerBootstrap serverBootstrap = new ServerBootstrap();
179         if (Epoll.isAvailable()) {
180             serverBootstrap.channel(EpollServerSocketChannel.class);
181             serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
182         } else {
183             serverBootstrap.channel(NioServerSocketChannel.class);
184         }
185         final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer);
186         serverBootstrap.childHandler(serverChannelHandler);
187
188         serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
189         serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
190         serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
191
192         // Make sure we are doing round-robin processing
193         serverBootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
194
195         if (serverBootstrap.group() == null) {
196             serverBootstrap.group(this.bossGroup, this.workerGroup);
197         }
198         return serverBootstrap;
199     }
200
201     private static final class BGPChannel {
202         private static final String NEGOTIATOR = "negotiator";
203
204         private BGPChannel() {
205
206         }
207
208         static <S extends BGPSession, T extends BGPSessionNegotiatorFactory<S>> ChannelPipelineInitializer<S>
209         createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
210             return (channel, promise) -> {
211                 channel.pipeline().addLast(hf.getDecoders());
212                 channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise));
213                 channel.pipeline().addLast(hf.getEncoders());
214             };
215         }
216
217         static <S extends BGPSession> ChannelHandler createClientChannelHandler(
218                 final ChannelPipelineInitializer<S> initializer, final Promise<S> promise) {
219             return new ChannelInitializer<SocketChannel>() {
220                 @Override
221                 protected void initChannel(final SocketChannel channel) {
222                     initializer.initializeChannel(channel, promise);
223                 }
224             };
225         }
226
227         static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) {
228             return new ChannelInitializer<SocketChannel>() {
229                 @Override
230                 protected void initChannel(final SocketChannel channel) {
231                     initializer.initializeChannel(channel, new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
232                 }
233             };
234         }
235     }
236 }