Disconnect bgp-rib-impl from global event loop groups
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPDispatcherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.bootstrap.ServerBootstrap;
15 import io.netty.buffer.PooledByteBufAllocator;
16 import io.netty.channel.AdaptiveRecvByteBufAllocator;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelHandler;
19 import io.netty.channel.ChannelInitializer;
20 import io.netty.channel.ChannelOption;
21 import io.netty.channel.RecvByteBufAllocator;
22 import io.netty.channel.WriteBufferWaterMark;
23 import io.netty.channel.socket.SocketChannel;
24 import io.netty.util.concurrent.DefaultPromise;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.GlobalEventExecutor;
27 import io.netty.util.concurrent.Promise;
28 import java.net.InetSocketAddress;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionConsumerContext;
32 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
33 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
34 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
35 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
36 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
37 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
38 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
39 import org.opendaylight.protocol.concepts.KeyMapping;
40 import org.osgi.service.component.annotations.Activate;
41 import org.osgi.service.component.annotations.Component;
42 import org.osgi.service.component.annotations.Reference;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Implementation of BGPDispatcher.
48  */
49 @Singleton
50 @Component(immediate = true)
51 public final class BGPDispatcherImpl implements BGPDispatcher {
52     private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
53     private static final int SOCKET_BACKLOG_SIZE = 128;
54
55     private static final WriteBufferWaterMark WATER_MARK = new WriteBufferWaterMark(128 * 1024, 256 * 1024);
56
57     // An adaptive allocator, so we size our message buffers based on what we receive, but make sure we process one
58     // message at a time. This should be good enough for most cases, although we could optimize it a bit based on
59     // whether we actually negotiate use of large messages -- based on that the range of allocations can be constrained
60     // from the default 64-65536 range to 64-4096.
61     private static final RecvByteBufAllocator RECV_ALLOCATOR = new AdaptiveRecvByteBufAllocator().maxMessagesPerRead(1);
62
63     private final BGPHandlerFactory handlerFactory;
64     private final BGPPeerRegistry bgpPeerRegistry;
65     private final BGPNettyGroups nettyGroups;
66
67     @Inject
68     @Activate
69     public BGPDispatcherImpl(@Reference final BGPExtensionConsumerContext extensions,
70             @Reference final BGPNettyGroups nettyGroups, @Reference final BGPPeerRegistry bgpPeerRegistry) {
71         this.nettyGroups = requireNonNull(nettyGroups);
72         this.bgpPeerRegistry = requireNonNull(bgpPeerRegistry);
73         handlerFactory = new BGPHandlerFactory(extensions.getMessageRegistry());
74     }
75
76     @VisibleForTesting
77     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
78             final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
79         final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.of(), reuseAddress, localAddress);
80         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(bgpPeerRegistry);
81         final ChannelPipelineInitializer<BGPSessionImpl> initializer = BGPChannel.createChannelPipelineInitializer(
82                 handlerFactory, snf);
83
84         final BGPProtocolSessionPromise<BGPSessionImpl> sessionPromise = new BGPProtocolSessionPromise<>(remoteAddress,
85                 retryTimer, clientBootStrap, bgpPeerRegistry);
86         clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
87         sessionPromise.connect();
88         LOG.debug("Client created.");
89         return sessionPromise;
90     }
91
92     private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress,
93             final InetSocketAddress localAddress) {
94         return nettyGroups.createBootstrap(keys)
95             // Make sure we are doing round-robin processing
96             .option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR)
97             .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
98             .option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK)
99             .option(ChannelOption.SO_REUSEADDR, reuseAddress)
100             .localAddress(localAddress);
101     }
102
103     @Override
104     public synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
105             final InetSocketAddress localAddress, final int retryTimer, final KeyMapping keys) {
106         return createReconnectingClient(remoteAddress, retryTimer, keys, localAddress, false);
107     }
108
109     @VisibleForTesting
110     synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
111             final int retryTimer, final KeyMapping keys, final InetSocketAddress localAddress,
112             final boolean reuseAddress) {
113         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(bgpPeerRegistry);
114         final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress, localAddress);
115         final BGPReconnectPromise<?> reconnectPromise = new BGPReconnectPromise<>(GlobalEventExecutor.INSTANCE,
116                 remoteAddress, retryTimer, bootstrap, bgpPeerRegistry,
117                 BGPChannel.createChannelPipelineInitializer(handlerFactory, snf));
118         reconnectPromise.connect();
119         return reconnectPromise;
120     }
121
122     @Override
123     public synchronized ChannelFuture createServer(final InetSocketAddress serverAddress) {
124         final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(bgpPeerRegistry);
125         final ChannelPipelineInitializer<?> initializer = BGPChannel.createChannelPipelineInitializer(
126             handlerFactory, snf);
127         final ServerBootstrap serverBootstrap = createServerBootstrap(initializer);
128         final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress);
129         LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress);
130         return channelFuture;
131     }
132
133     @Override
134     public BGPPeerRegistry getBGPPeerRegistry() {
135         return bgpPeerRegistry;
136     }
137
138     private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer<?> initializer) {
139         return nettyGroups.createServerBootstrap()
140             .childHandler(BGPChannel.createServerChannelHandler(initializer))
141             .option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE)
142             .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
143             .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK)
144             // Make sure we are doing round-robin processing
145             .option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
146     }
147
148     private static final class BGPChannel {
149         private static final String NEGOTIATOR = "negotiator";
150
151         private BGPChannel() {
152
153         }
154
155         static <S extends BGPSession, T extends BGPSessionNegotiatorFactory<S>> ChannelPipelineInitializer<S>
156             createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
157             return (channel, promise) -> {
158                 channel.pipeline().addLast(hf.getDecoders());
159                 channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise));
160                 channel.pipeline().addLast(hf.getEncoders());
161             };
162         }
163
164         static <S extends BGPSession> ChannelHandler createClientChannelHandler(
165                 final ChannelPipelineInitializer<S> initializer, final Promise<S> promise) {
166             return new ChannelInitializer<SocketChannel>() {
167                 @Override
168                 protected void initChannel(final SocketChannel channel) {
169                     initializer.initializeChannel(channel, promise);
170                 }
171             };
172         }
173
174         static <S extends BGPSession> ChannelHandler createServerChannelHandler(
175                 final ChannelPipelineInitializer<S> initializer) {
176             return new ChannelInitializer<SocketChannel>() {
177                 @Override
178                 protected void initChannel(final SocketChannel channel) {
179                     initializer.initializeChannel(channel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
180                 }
181             };
182         }
183     }
184 }