Replace Preconditions.CheckNotNull per RequireNonNull
[bgpcep.git] / bgp / bmp-impl / src / main / java / org / opendaylight / protocol / bmp / impl / BmpDispatcherImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.protocol.bmp.impl;
10
11 import static java.util.Objects.requireNonNull;
12
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.bootstrap.ServerBootstrap;
15 import io.netty.buffer.PooledByteBufAllocator;
16 import io.netty.channel.AbstractChannel;
17 import io.netty.channel.Channel;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelFutureListener;
20 import io.netty.channel.ChannelInitializer;
21 import io.netty.channel.ChannelOption;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.epoll.Epoll;
25 import io.netty.channel.epoll.EpollChannelOption;
26 import io.netty.channel.epoll.EpollEventLoopGroup;
27 import io.netty.channel.epoll.EpollServerSocketChannel;
28 import io.netty.channel.epoll.EpollSocketChannel;
29 import io.netty.channel.socket.nio.NioServerSocketChannel;
30 import io.netty.channel.socket.nio.NioSocketChannel;
31 import java.net.InetSocketAddress;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.protocol.bmp.api.BmpDispatcher;
34 import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
35 import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
36 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
37 import org.opendaylight.protocol.concepts.KeyMapping;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 public class BmpDispatcherImpl implements BmpDispatcher {
42
43     private static final Logger LOG = LoggerFactory.getLogger(BmpDispatcherImpl.class);
44
45     private static final int MAX_CONNECTIONS_COUNT = 128;
46
47     private static final int CONNECT_TIMEOUT = 5000;
48     private static final int INITIAL_BACKOFF = 30_000;
49     private static final int MAXIMUM_BACKOFF = 720_000;
50     private static final long TIMEOUT = 10;
51
52     private final BmpHandlerFactory hf;
53     private final EventLoopGroup bossGroup;
54     private final EventLoopGroup workerGroup;
55     private final BmpSessionFactory sessionFactory;
56
57     public BmpDispatcherImpl(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup,
58             final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
59         if (Epoll.isAvailable()) {
60             this.bossGroup = new EpollEventLoopGroup();
61             this.workerGroup = new EpollEventLoopGroup();
62         } else {
63             this.bossGroup = requireNonNull(bossGroup);
64             this.workerGroup = requireNonNull(workerGroup);
65         }
66         this.hf = new BmpHandlerFactory(requireNonNull(registry));
67         this.sessionFactory = requireNonNull(sessionFactory);
68     }
69
70     @Override
71     public ChannelFuture createClient(final InetSocketAddress address, final BmpSessionListenerFactory slf,
72         final KeyMapping keys) {
73
74         final Bootstrap b = new Bootstrap();
75
76         requireNonNull(address);
77
78         if (Epoll.isAvailable()) {
79             b.channel(EpollSocketChannel.class);
80         } else {
81             b.channel(NioSocketChannel.class);
82         }
83         if (!keys.isEmpty()) {
84             if (Epoll.isAvailable()) {
85                 b.option(EpollChannelOption.TCP_MD5SIG, keys);
86             } else {
87                 throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
88             }
89         }
90         b.option(ChannelOption.SO_KEEPALIVE, true);
91         b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
92         b.group(this.workerGroup);
93
94         b.handler(new ChannelInitializer<AbstractChannel>() {
95             @Override
96             protected void initChannel(final AbstractChannel ch) throws Exception {
97                 ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
98                 ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
99             }
100         });
101
102         b.remoteAddress(address);
103         final ChannelFuture channelPromise = b.connect();
104         channelPromise.addListener(new BmpDispatcherImpl.BootstrapListener(b, address));
105         return channelPromise;
106     }
107
108     @Override
109     public ChannelFuture createServer(final InetSocketAddress address, final BmpSessionListenerFactory slf,
110         final KeyMapping keys) {
111         requireNonNull(address);
112         requireNonNull(slf);
113
114         final ServerBootstrap b = new ServerBootstrap();
115         b.childHandler(new ChannelInitializer<Channel>() {
116             @Override
117             protected void initChannel(final Channel ch) throws Exception {
118                 ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
119                 ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
120             }
121         });
122
123         b.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
124         b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
125
126         if (Epoll.isAvailable()) {
127             b.channel(EpollServerSocketChannel.class);
128         } else {
129             b.channel(NioServerSocketChannel.class);
130         }
131
132         if (!keys.isEmpty()) {
133             if (Epoll.isAvailable()) {
134                 b.option(EpollChannelOption.TCP_MD5SIG, keys);
135             } else {
136                 throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
137             }
138         }
139         b.group(this.bossGroup, this.workerGroup);
140         final ChannelFuture f = b.bind(address);
141
142         LOG.debug("Initiated BMP server {} at {}.", f, address);
143         return f;
144     }
145
146     @Override
147     public void close() {
148         if (Epoll.isAvailable()) {
149             this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
150             this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
151         }
152     }
153
154     private class BootstrapListener implements ChannelFutureListener {
155
156         private final Bootstrap bootstrap;
157
158         private long delay;
159
160         private final InetSocketAddress address;
161
162         public BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress address) {
163             this.bootstrap = bootstrap;
164             this.address = address;
165             this.delay = INITIAL_BACKOFF;
166         }
167
168         @Override
169         public void operationComplete(final ChannelFuture cf) throws Exception {
170             if (cf.isCancelled()) {
171                 LOG.debug("Connection {} cancelled!", cf);
172             } else if (cf.isSuccess()) {
173                 LOG.debug("Connection {} succeeded!", cf);
174             } else {
175                 if (this.delay > MAXIMUM_BACKOFF) {
176                     LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP " +
177                         "router {}.", this.address);
178                     cf.cancel(false);
179                     return;
180                 }
181                 final EventLoop loop = cf.channel().eventLoop();
182                 loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
183                 LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
184                     this.address, this.delay);
185                 this.delay *= 2;
186             }
187         }
188     }
189 }