2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.bgp.rib.impl;
10 import com.google.common.base.Preconditions;
11 import io.netty.bootstrap.Bootstrap;
12 import io.netty.bootstrap.ServerBootstrap;
13 import io.netty.buffer.PooledByteBufAllocator;
14 import io.netty.channel.ChannelFuture;
15 import io.netty.channel.ChannelHandler;
16 import io.netty.channel.ChannelInitializer;
17 import io.netty.channel.ChannelOption;
18 import io.netty.channel.EventLoopGroup;
19 import io.netty.channel.socket.SocketChannel;
20 import io.netty.channel.socket.nio.NioServerSocketChannel;
21 import io.netty.channel.socket.nio.NioSocketChannel;
22 import io.netty.util.concurrent.DefaultPromise;
23 import io.netty.util.concurrent.EventExecutor;
24 import io.netty.util.concurrent.Future;
25 import io.netty.util.concurrent.GlobalEventExecutor;
26 import io.netty.util.concurrent.Promise;
27 import java.net.InetSocketAddress;
28 import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry;
29 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
30 import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
31 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
32 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
33 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator;
34 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
35 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
36 import org.opendaylight.protocol.framework.ReconnectStrategy;
37 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
38 import org.opendaylight.tcpmd5.api.KeyMapping;
39 import org.opendaylight.tcpmd5.netty.MD5ChannelFactory;
40 import org.opendaylight.tcpmd5.netty.MD5ChannelOption;
41 import org.opendaylight.tcpmd5.netty.MD5ServerChannelFactory;
42 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Implementation of BGPDispatcher.
49 public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
50 private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
51 private static final int SOCKET_BACKLOG_SIZE = 128;
52 private final MD5ServerChannelFactory<?> scf;
53 private final MD5ChannelFactory<?> cf;
54 private final BGPHandlerFactory hf;
55 private final EventLoopGroup bossGroup;
56 private final EventLoopGroup workerGroup;
57 private final EventExecutor executor;
58 private KeyMapping keys;
60 public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
61 this(messageRegistry, bossGroup, workerGroup, null, null);
64 public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MD5ChannelFactory<?> cf, final MD5ServerChannelFactory<?> scf) {
65 this.bossGroup = Preconditions.checkNotNull(bossGroup);
66 this.workerGroup = Preconditions.checkNotNull(workerGroup);
67 this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
68 this.hf = new BGPHandlerFactory(messageRegistry);
75 public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress address,
76 final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy) {
77 final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, listener);
78 final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer
79 (BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders());
81 final Bootstrap b = new Bootstrap();
82 final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, b);
83 b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true));
84 b.handler(BGPChannel.createChannelInitializer(initializer, p));
85 this.customizeBootstrap(b);
86 this.setWorkerGroup(b);
88 LOG.debug("Client created.");
95 this.workerGroup.shutdownGracefully();
97 this.bossGroup.shutdownGracefully();
102 public synchronized Future<Void> createReconnectingClient(final InetSocketAddress address,
103 final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory connectStrategyFactory,
104 final KeyMapping keys) {
105 final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, peerRegistry);
108 final Bootstrap b = new Bootstrap();
109 final BGPReconnectPromise p = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, address,
110 connectStrategyFactory, b, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders()));
111 b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true));
112 this.customizeBootstrap(b);
113 this.setWorkerGroup(b);
122 public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator) {
123 final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(sessionValidator, registry);
124 final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer
125 (BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders());
126 final ServerBootstrap b = new ServerBootstrap();
127 b.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor)));
128 b.option(ChannelOption.SO_BACKLOG, Integer.valueOf(SOCKET_BACKLOG_SIZE));
129 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
130 this.customizeBootstrap(b);
132 final ChannelFuture f = b.bind(address);
133 LOG.debug("Initiated server {} at {}.", f, address);
137 protected void customizeBootstrap(final Bootstrap b) {
138 if (this.keys != null && !this.keys.isEmpty()) {
139 if (this.cf == null) {
140 throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
142 b.channelFactory(this.cf);
143 b.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
146 // Make sure we are doing round-robin processing
147 b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
150 private void customizeBootstrap(final ServerBootstrap b) {
151 if (this.keys != null && !this.keys.isEmpty()) {
152 if (this.scf == null) {
153 throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
155 b.channelFactory(this.scf);
156 b.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
159 // Make sure we are doing round-robin processing
160 b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
162 if (b.group() == null) {
163 b.group(this.bossGroup, this.workerGroup);
167 b.channel(NioServerSocketChannel.class);
168 } catch (IllegalStateException e) {
169 LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
173 private void setWorkerGroup(final Bootstrap b) {
174 if (b.group() == null) {
175 b.group(this.workerGroup);
178 b.channel(NioSocketChannel.class);
179 } catch (IllegalStateException e) {
180 LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
184 public final static class BGPChannel {
185 private static final String NEGOTIATOR = "negotiator";
187 private BGPChannel() {
191 public static <T extends BGPSessionNegotiatorFactory> ChannelPipelineInitializer createChannelPipelineInitializer(final ChannelHandler[] channelDecoder,
193 final ChannelHandler[] channelEncoder) {
194 return new ChannelPipelineInitializer() {
196 public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
197 ch.pipeline().addLast(channelDecoder);
198 ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(ch, promise));
199 ch.pipeline().addLast(channelEncoder);
204 public static ChannelHandler createChannelInitializer(final ChannelPipelineInitializer initializer, final Promise<BGPSessionImpl> promise) {
205 return new ChannelInitializer<SocketChannel>() {
207 protected void initChannel(SocketChannel ch) {
208 initializer.initializeChannel(ch, promise);