BGPCEP-737: Implement BMP client reconnection
[bgpcep.git] / bgp / bmp-impl / src / main / java / org / opendaylight / protocol / bmp / impl / BmpDispatcherImpl.java
index 4fa2b04d5f5cd0696cbc88ab00f851f27f863ccf..6e5454c54d07bf0d7e8e53d1faf8a94c42e19ad8 100644 (file)
@@ -8,28 +8,25 @@
 
 package org.opendaylight.protocol.bmp.impl;
 
+import static java.util.Objects.requireNonNull;
+import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createClientBootstrap;
+import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createServerBootstrap;
+
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AbstractChannel;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollChannelOption;
 import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.bmp.api.BmpDispatcher;
 import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
 import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
@@ -42,8 +39,6 @@ public class BmpDispatcherImpl implements BmpDispatcher {
 
     private static final Logger LOG = LoggerFactory.getLogger(BmpDispatcherImpl.class);
 
-    private static final int MAX_CONNECTIONS_COUNT = 128;
-
     private static final int CONNECT_TIMEOUT = 5000;
     private static final int INITIAL_BACKOFF = 30_000;
     private static final int MAXIMUM_BACKOFF = 720_000;
@@ -53,6 +48,8 @@ public class BmpDispatcherImpl implements BmpDispatcher {
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final BmpSessionFactory sessionFactory;
+    @GuardedBy("this")
+    private boolean close;
 
     public BmpDispatcherImpl(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup,
             final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
@@ -60,89 +57,38 @@ public class BmpDispatcherImpl implements BmpDispatcher {
             this.bossGroup = new EpollEventLoopGroup();
             this.workerGroup = new EpollEventLoopGroup();
         } else {
-            this.bossGroup = Preconditions.checkNotNull(bossGroup);
-            this.workerGroup = Preconditions.checkNotNull(workerGroup);
+            this.bossGroup = requireNonNull(bossGroup);
+            this.workerGroup = requireNonNull(workerGroup);
         }
-        this.hf = new BmpHandlerFactory(Preconditions.checkNotNull(registry));
-        this.sessionFactory = Preconditions.checkNotNull(sessionFactory);
+        this.hf = new BmpHandlerFactory(requireNonNull(registry));
+        this.sessionFactory = requireNonNull(sessionFactory);
     }
 
     @Override
-    public ChannelFuture createClient(final InetSocketAddress address, final BmpSessionListenerFactory slf, final Optional<KeyMapping> keys) {
-
-        final Bootstrap b = new Bootstrap();
-
-        Preconditions.checkNotNull(address);
-
-        if (Epoll.isAvailable()) {
-            b.channel(EpollSocketChannel.class);
-        } else {
-            b.channel(NioSocketChannel.class);
-        }
-        if (keys.isPresent()) {
-            if (Epoll.isAvailable()) {
-                b.option(EpollChannelOption.TCP_MD5SIG, keys.get());
-            } else {
-                throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
-            }
-        }
-        b.option(ChannelOption.SO_KEEPALIVE, true);
-        b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
-        b.group(this.workerGroup);
-
-        b.handler(new ChannelInitializer<AbstractChannel>() {
-            @Override
-            protected void initChannel(final AbstractChannel ch) throws Exception {
-                ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
-                ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
-            }
-        });
-
-        b.remoteAddress(address);
-        final ChannelFuture channelPromise = b.connect();
-        channelPromise.addListener(new BmpDispatcherImpl.BootstrapListener(b, address));
+    public ChannelFuture createClient(final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf,
+            final Optional<KeyMapping> keys) {
+        final Bootstrap bootstrap = createClientBootstrap(this.sessionFactory, this.hf,
+                BmpDispatcherUtil::createChannelWithDecoder, slf, remoteAddress, this.workerGroup,
+                CONNECT_TIMEOUT, keys);
+        final ChannelFuture channelPromise = bootstrap.connect();
+        channelPromise.addListener(new BootstrapListener(bootstrap, remoteAddress, slf, keys));
+        LOG.debug("Initiated BMP Client {} at {}.", channelPromise, remoteAddress);
         return channelPromise;
     }
 
     @Override
-    public ChannelFuture createServer(final InetSocketAddress address, final BmpSessionListenerFactory slf, final Optional<KeyMapping> keys) {
-        Preconditions.checkNotNull(address);
-        Preconditions.checkNotNull(slf);
-
-        final ServerBootstrap b = new ServerBootstrap();
-        b.childHandler(new ChannelInitializer<Channel>() {
-            @Override
-            protected void initChannel(final Channel ch) throws Exception {
-                ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
-                ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
-            }
-        });
-
-        b.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
-        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
-        if (Epoll.isAvailable()) {
-            b.channel(EpollServerSocketChannel.class);
-        } else {
-            b.channel(NioServerSocketChannel.class);
-        }
-
-        if (keys.isPresent()) {
-            if (Epoll.isAvailable()) {
-                b.option(EpollChannelOption.TCP_MD5SIG, keys.get());
-            } else {
-                throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
-            }
-        }
-        b.group(this.bossGroup, this.workerGroup);
-        final ChannelFuture f = b.bind(address);
-
-        LOG.debug("Initiated BMP server {} at {}.", f, address);
-        return f;
+    public ChannelFuture createServer(final InetSocketAddress address, final BmpSessionListenerFactory slf,
+            final Optional<KeyMapping> keys) {
+        final ServerBootstrap serverBootstrap = createServerBootstrap(this.sessionFactory, this.hf, slf,
+                BmpDispatcherUtil::createChannelWithDecoder, this.bossGroup, this.workerGroup, keys);
+        final ChannelFuture channelFuture = serverBootstrap.bind(address);
+        LOG.debug("Initiated BMP server {} at {}.", channelFuture, address);
+        return channelFuture;
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
+        this.close = true;
         if (Epoll.isAvailable()) {
             this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
             this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
@@ -150,36 +96,59 @@ public class BmpDispatcherImpl implements BmpDispatcher {
     }
 
     private class BootstrapListener implements ChannelFutureListener {
-
         private final Bootstrap bootstrap;
-
+        private final InetSocketAddress remoteAddress;
+        private final BmpSessionListenerFactory slf;
+        private final Optional<KeyMapping> keys;
         private long delay;
+        private Timer timer = new Timer();
 
-        private final InetSocketAddress address;
-
-        public BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress address) {
+        BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress remoteAddress,
+                final BmpSessionListenerFactory slf, final Optional<KeyMapping> keys) {
             this.bootstrap = bootstrap;
-            this.address = address;
+            this.remoteAddress = remoteAddress;
             this.delay = INITIAL_BACKOFF;
+            this.slf = slf;
+            this.keys = keys;
         }
 
         @Override
-        public void operationComplete(final ChannelFuture cf) throws Exception {
-            if (cf.isCancelled()) {
-                LOG.debug("Connection {} cancelled!", cf);
-            } else if (cf.isSuccess()) {
-                LOG.debug("Connection {} succeeded!", cf);
+        public void operationComplete(final ChannelFuture future) throws Exception {
+            if (future.isCancelled()) {
+                LOG.debug("Connection {} cancelled!", future);
+            } else if (future.isSuccess()) {
+                LOG.debug("Connection {} succeeded!", future);
+                addCloseDetectListener(future.channel());
             } else {
                 if (this.delay > MAXIMUM_BACKOFF) {
-                    LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP router {}.", this.address);
-                    cf.cancel(false);
+                    LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP " +
+                            "router {}.", this.remoteAddress);
+                    future.cancel(false);
                     return;
                 }
-                final EventLoop loop = cf.channel().eventLoop();
+                final EventLoop loop = future.channel().eventLoop();
                 loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
-                LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.", this.address, this.delay);
+                LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
+                        this.remoteAddress, this.delay);
                 this.delay *= 2;
             }
         }
+
+        private void addCloseDetectListener(Channel channel) {
+            //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
+            channel.closeFuture().addListener((ChannelFutureListener) future -> scheduleConnect());
+        }
+
+        private void scheduleConnect() {
+            if (!BmpDispatcherImpl.this.close) {
+                timer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        createClient(BootstrapListener.this.remoteAddress, BootstrapListener.this.slf,
+                                BootstrapListener.this.keys);
+                    }
+                }, (long) 5);
+            }
+        }
     }
 }