BUG-7706: Fix ServiceUnavailableException
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPDispatcherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.bootstrap.ServerBootstrap;
15 import io.netty.buffer.PooledByteBufAllocator;
16 import io.netty.channel.ChannelFuture;
17 import io.netty.channel.ChannelHandler;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.epoll.Epoll;
22 import io.netty.channel.epoll.EpollChannelOption;
23 import io.netty.channel.epoll.EpollEventLoopGroup;
24 import io.netty.channel.epoll.EpollMode;
25 import io.netty.channel.epoll.EpollServerSocketChannel;
26 import io.netty.channel.epoll.EpollSocketChannel;
27 import io.netty.channel.socket.SocketChannel;
28 import io.netty.channel.socket.nio.NioServerSocketChannel;
29 import io.netty.channel.socket.nio.NioSocketChannel;
30 import io.netty.util.concurrent.DefaultPromise;
31 import io.netty.util.concurrent.Future;
32 import io.netty.util.concurrent.GlobalEventExecutor;
33 import io.netty.util.concurrent.Promise;
34 import java.net.InetSocketAddress;
35 import java.util.concurrent.TimeUnit;
36 import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry;
37 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
38 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
39 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
40 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
41 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
42 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
43 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
44 import org.opendaylight.protocol.concepts.KeyMapping;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Implementation of BGPDispatcher.
50  */
51 public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
52     private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
53     private static final int SOCKET_BACKLOG_SIZE = 128;
54     private static final int HIGH_WATER_MARK = 256 * 1024;
55     private static final int LOW_WATER_MARK = 128 * 1024;
56     private static final long TIMEOUT = 10;
57
58     private final BGPHandlerFactory handlerFactory;
59     private final EventLoopGroup bossGroup;
60     private final EventLoopGroup workerGroup;
61     private final BGPPeerRegistry bgpPeerRegistry;
62
63     public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup,
64         final EventLoopGroup workerGroup, final BGPPeerRegistry bgpPeerRegistry) {
65         if (Epoll.isAvailable()) {
66             this.bossGroup = new EpollEventLoopGroup();
67             this.workerGroup = new EpollEventLoopGroup();
68         } else {
69             this.bossGroup = Preconditions.checkNotNull(bossGroup);
70             this.workerGroup = Preconditions.checkNotNull(workerGroup);
71         }
72         this.bgpPeerRegistry = Preconditions.checkNotNull(bgpPeerRegistry);
73         this.handlerFactory = new BGPHandlerFactory(messageRegistry);
74     }
75
76     @Override
77     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress, final int retryTimer) {
78         return createClient(remoteAddress, retryTimer, createClientBootStrap(Optional.absent(), false));
79     }
80
81     private synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress,
82         final int retryTimer, final Bootstrap clientBootStrap) {
83         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
84         final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf);
85
86         final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(remoteAddress, retryTimer,
87             clientBootStrap, this.bgpPeerRegistry);
88         clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
89         sessionPromise.connect();
90         LOG.debug("Client created.");
91         return sessionPromise;
92     }
93
94     @VisibleForTesting
95     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
96         final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
97         final Bootstrap clientBootStrap = createClientBootStrap(Optional.absent(), reuseAddress);
98         clientBootStrap.localAddress(localAddress);
99         return createClient(remoteAddress, retryTimer, clientBootStrap);
100     }
101
102     private synchronized Bootstrap createClientBootStrap(final Optional<KeyMapping> keys, final boolean reuseAddress) {
103         final Bootstrap bootstrap = new Bootstrap();
104         if (Epoll.isAvailable()) {
105             bootstrap.channel(EpollSocketChannel.class);
106             bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
107         } else {
108             bootstrap.channel(NioSocketChannel.class);
109         }
110         if (keys.isPresent()) {
111             if (Epoll.isAvailable()) {
112                 bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.get());
113             } else {
114                 throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
115             }
116         }
117
118         // Make sure we are doing round-robin processing
119         bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
120         bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
121         bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
122         bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
123         bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
124
125         if (bootstrap.group() == null) {
126             bootstrap.group(this.workerGroup);
127         }
128
129         return bootstrap;
130     }
131
132     @Override
133     public synchronized void close() throws InterruptedException {
134         if (Epoll.isAvailable()) {
135             LOG.debug("Closing Dispatcher");
136             this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
137             this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
138         }
139     }
140
141     @Override
142     public synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
143             final int retryTimer, final Optional<KeyMapping> keys) {
144         return createReconnectingClient(remoteAddress, retryTimer, keys, null, false);
145     }
146
147     @VisibleForTesting
148     protected synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
149         final int retryTimer, final Optional<KeyMapping> keys, final InetSocketAddress localAddress,
150         final boolean reuseAddress) {
151         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(this.bgpPeerRegistry);
152         final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress);
153         bootstrap.localAddress(localAddress);
154         final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE,
155             remoteAddress, retryTimer, bootstrap, this.bgpPeerRegistry,
156             BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf));
157         reconnectPromise.connect();
158         return reconnectPromise;
159     }
160
161     @Override
162     public synchronized ChannelFuture createServer(final InetSocketAddress serverAddress) {
163         final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(this.bgpPeerRegistry);
164         final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(this.handlerFactory, snf);
165         final ServerBootstrap serverBootstrap = createServerBootstrap(initializer);
166         final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress);
167         LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress);
168         return channelFuture;
169     }
170
171     @Override
172     public BGPPeerRegistry getBGPPeerRegistry() {
173         return this.bgpPeerRegistry;
174     }
175
176     private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
177         final ServerBootstrap serverBootstrap = new ServerBootstrap();
178         if (Epoll.isAvailable()) {
179             serverBootstrap.channel(EpollServerSocketChannel.class);
180             serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
181         } else {
182             serverBootstrap.channel(NioServerSocketChannel.class);
183         }
184         final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer);
185         serverBootstrap.childHandler(serverChannelHandler);
186
187         serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
188         serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
189         serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
190         serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
191
192         // Make sure we are doing round-robin processing
193         serverBootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
194
195         if (serverBootstrap.group() == null) {
196             serverBootstrap.group(this.bossGroup, this.workerGroup);
197         }
198         return serverBootstrap;
199     }
200
201     private static final class BGPChannel {
202         private static final String NEGOTIATOR = "negotiator";
203
204         private BGPChannel() {
205
206         }
207
208         static <T extends BGPSessionNegotiatorFactory> ChannelPipelineInitializer
209         createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
210             return (channel, promise) -> {
211                 channel.pipeline().addLast(hf.getDecoders());
212                 channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise));
213                 channel.pipeline().addLast(hf.getEncoders());
214             };
215         }
216
217         static <S extends BGPSession> ChannelHandler createClientChannelHandler(final ChannelPipelineInitializer initializer, final Promise<S> promise) {
218             return new ChannelInitializer<SocketChannel>() {
219                 @Override
220                 protected void initChannel(final SocketChannel channel) {
221                     initializer.initializeChannel(channel, promise);
222                 }
223             };
224         }
225
226         static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) {
227             return new ChannelInitializer<SocketChannel>() {
228                 @Override
229                 protected void initChannel(final SocketChannel channel) {
230                     initializer.initializeChannel(channel, new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
231                 }
232             };
233         }
234     }
235 }