2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.bgp.rib.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.bootstrap.ServerBootstrap;
15 import io.netty.buffer.PooledByteBufAllocator;
16 import io.netty.channel.AdaptiveRecvByteBufAllocator;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelHandler;
19 import io.netty.channel.ChannelInitializer;
20 import io.netty.channel.ChannelOption;
21 import io.netty.channel.RecvByteBufAllocator;
22 import io.netty.channel.WriteBufferWaterMark;
23 import io.netty.channel.socket.SocketChannel;
24 import io.netty.util.concurrent.DefaultPromise;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.GlobalEventExecutor;
27 import io.netty.util.concurrent.Promise;
28 import java.net.InetSocketAddress;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionConsumerContext;
32 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
33 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
34 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
35 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
36 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
37 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
38 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
39 import org.opendaylight.protocol.concepts.KeyMapping;
40 import org.osgi.service.component.annotations.Activate;
41 import org.osgi.service.component.annotations.Component;
42 import org.osgi.service.component.annotations.Reference;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Implementation of BGPDispatcher.
50 @Component(immediate = true)
51 public final class BGPDispatcherImpl implements BGPDispatcher {
52 private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
53 private static final int SOCKET_BACKLOG_SIZE = 128;
55 private static final WriteBufferWaterMark WATER_MARK = new WriteBufferWaterMark(128 * 1024, 256 * 1024);
57 // An adaptive allocator, so we size our message buffers based on what we receive, but make sure we process one
58 // message at a time. This should be good enough for most cases, although we could optimize it a bit based on
59 // whether we actually negotiate use of large messages -- based on that the range of allocations can be constrained
60 // from the default 64-65536 range to 64-4096.
61 private static final RecvByteBufAllocator RECV_ALLOCATOR = new AdaptiveRecvByteBufAllocator().maxMessagesPerRead(1);
63 private final BGPHandlerFactory handlerFactory;
64 private final BGPPeerRegistry bgpPeerRegistry;
65 private final BGPNettyGroups nettyGroups;
69 public BGPDispatcherImpl(@Reference final BGPExtensionConsumerContext extensions,
70 @Reference final BGPNettyGroups nettyGroups, @Reference final BGPPeerRegistry bgpPeerRegistry) {
71 this.nettyGroups = requireNonNull(nettyGroups);
72 this.bgpPeerRegistry = requireNonNull(bgpPeerRegistry);
73 handlerFactory = new BGPHandlerFactory(extensions.getMessageRegistry());
77 public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
78 final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
79 final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.of(), reuseAddress, localAddress);
80 final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(bgpPeerRegistry);
81 final ChannelPipelineInitializer<BGPSessionImpl> initializer = BGPChannel.createChannelPipelineInitializer(
84 final BGPProtocolSessionPromise<BGPSessionImpl> sessionPromise = new BGPProtocolSessionPromise<>(remoteAddress,
85 retryTimer, clientBootStrap, bgpPeerRegistry);
86 clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
87 sessionPromise.connect();
88 LOG.debug("Client created.");
89 return sessionPromise;
92 private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress,
93 final InetSocketAddress localAddress) {
94 return nettyGroups.createBootstrap(keys)
95 // Make sure we are doing round-robin processing
96 .option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR)
97 .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
98 .option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK)
99 .option(ChannelOption.SO_REUSEADDR, reuseAddress)
100 .localAddress(localAddress);
104 public synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
105 final InetSocketAddress localAddress, final int retryTimer, final KeyMapping keys) {
106 return createReconnectingClient(remoteAddress, retryTimer, keys, localAddress, false);
110 synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress,
111 final int retryTimer, final KeyMapping keys, final InetSocketAddress localAddress,
112 final boolean reuseAddress) {
113 final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(bgpPeerRegistry);
114 final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress, localAddress);
115 final BGPReconnectPromise<?> reconnectPromise = new BGPReconnectPromise<>(GlobalEventExecutor.INSTANCE,
116 remoteAddress, retryTimer, bootstrap, bgpPeerRegistry,
117 BGPChannel.createChannelPipelineInitializer(handlerFactory, snf));
118 reconnectPromise.connect();
119 return reconnectPromise;
123 public synchronized ChannelFuture createServer(final InetSocketAddress serverAddress) {
124 final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(bgpPeerRegistry);
125 final ChannelPipelineInitializer<?> initializer = BGPChannel.createChannelPipelineInitializer(
126 handlerFactory, snf);
127 final ServerBootstrap serverBootstrap = createServerBootstrap(initializer);
128 final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress);
129 LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress);
130 return channelFuture;
134 public BGPPeerRegistry getBGPPeerRegistry() {
135 return bgpPeerRegistry;
138 private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer<?> initializer) {
139 return nettyGroups.createServerBootstrap()
140 .childHandler(BGPChannel.createServerChannelHandler(initializer))
141 .option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE)
142 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
143 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK)
144 // Make sure we are doing round-robin processing
145 .option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
148 private static final class BGPChannel {
149 private static final String NEGOTIATOR = "negotiator";
151 private BGPChannel() {
155 static <S extends BGPSession, T extends BGPSessionNegotiatorFactory<S>> ChannelPipelineInitializer<S>
156 createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
157 return (channel, promise) -> {
158 channel.pipeline().addLast(hf.getDecoders());
159 channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise));
160 channel.pipeline().addLast(hf.getEncoders());
164 static <S extends BGPSession> ChannelHandler createClientChannelHandler(
165 final ChannelPipelineInitializer<S> initializer, final Promise<S> promise) {
166 return new ChannelInitializer<SocketChannel>() {
168 protected void initChannel(final SocketChannel channel) {
169 initializer.initializeChannel(channel, promise);
174 static <S extends BGPSession> ChannelHandler createServerChannelHandler(
175 final ChannelPipelineInitializer<S> initializer) {
176 return new ChannelInitializer<SocketChannel>() {
178 protected void initChannel(final SocketChannel channel) {
179 initializer.initializeChannel(channel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));