BGPCEP-737: Implement BMP client reconnection
[bgpcep.git] / bgp / bmp-mock / src / main / java / org / opendaylight / protocol / bmp / mock / BmpMockDispatcher.java
index 2caae84ab6878e6f5c17cbdef98cd0cb8711d495..6d7f4a8baf5f997d6de9c7c0a8b74e96ec9455b1 100644 (file)
@@ -10,127 +10,121 @@ package org.opendaylight.protocol.bmp.mock;
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.base.Optional;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil;
 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class BmpMockDispatcher {
+final class BmpMockDispatcher implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(BmpMockDispatcher.class);
     private static final int CONNECT_TIMEOUT = 2000;
-    private static final int MAX_CONNECTIONS_COUNT = 128;
     private static final int INITIAL_BACKOFF = 15_000;
     private final BmpHandlerFactory hf;
     private final BmpSessionFactory sessionFactory;
 
     private final EventLoopGroup bossGroup = new NioEventLoopGroup();
     private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+    private final BmpMockSessionListenerFactory slf;
+    @GuardedBy("this")
+    private boolean close;
 
     BmpMockDispatcher(final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
         this.sessionFactory = requireNonNull(sessionFactory);
+        this.slf = new BmpMockSessionListenerFactory();
         requireNonNull(registry);
         this.hf = new BmpHandlerFactory(registry);
     }
 
-    private Bootstrap createClientInstance(final SocketAddress localAddress) {
-        final NioEventLoopGroup workergroup = new NioEventLoopGroup();
-        final Bootstrap bootstrap = new Bootstrap();
-
-        bootstrap.channel(NioSocketChannel.class);
-        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
-        bootstrap.option(ChannelOption.SO_REUSEADDR, true);
-        bootstrap.group(workergroup);
-
-        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
-            @Override
-            protected void initChannel(final NioSocketChannel ch) throws Exception {
-                ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
-                ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
-            }
-        });
-        bootstrap.localAddress(localAddress);
-        return bootstrap;
-    }
-
-    ChannelFuture createClient(final SocketAddress localAddress, final InetSocketAddress remoteAddress) {
-        requireNonNull(localAddress);
-        requireNonNull(remoteAddress);
-
-        // ideally we should use Bootstrap clones here
-        final Bootstrap bootstrap = createClientInstance(localAddress);
-        bootstrap.remoteAddress(remoteAddress);
+    ChannelFuture createClient(@Nonnull final SocketAddress localAddress,
+            @Nonnull final InetSocketAddress remoteAddress) {
+        final Bootstrap bootstrap = BmpDispatcherUtil.createClientBootstrap(this.sessionFactory, this.hf,
+                BmpDispatcherUtil::createChannelWithEncoder, this.slf, remoteAddress, localAddress, this.workerGroup,
+                CONNECT_TIMEOUT, Optional.absent(), true, false);
         final ChannelFuture channelFuture = bootstrap.connect(remoteAddress);
         LOG.info("BMP client {} <--> {} deployed", localAddress, remoteAddress);
-        channelFuture.addListener(new BootstrapListener(bootstrap, remoteAddress));
+        channelFuture.addListener(new BootstrapListener(bootstrap, localAddress, remoteAddress));
         return channelFuture;
     }
 
-    private ServerBootstrap createServerInstance() {
-        final ServerBootstrap serverBootstrap = new ServerBootstrap();
-        serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
-            @Override
-            protected void initChannel(final Channel ch) throws Exception {
-                ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
-                ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
-            }
-        });
-
-        serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
-        serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-        serverBootstrap.channel(NioServerSocketChannel.class);
-        serverBootstrap.group(this.bossGroup, this.workerGroup);
-        return serverBootstrap;
-    }
-
     ChannelFuture createServer(final InetSocketAddress localAddress) {
         requireNonNull(localAddress);
-        final ServerBootstrap serverBootstrap = createServerInstance();
+        final ServerBootstrap serverBootstrap = BmpDispatcherUtil.createServerBootstrap(this.sessionFactory,
+                this.hf, this.slf, BmpDispatcherUtil::createChannelWithEncoder,
+                this.bossGroup, this.workerGroup, Optional.absent(), false);
         final ChannelFuture channelFuture = serverBootstrap.bind(localAddress);
         LOG.info("Initiated BMP server at {}.", localAddress);
         return channelFuture;
     }
 
-    private static class BootstrapListener implements ChannelFutureListener {
+    @Override
+    public synchronized void close() {
+        this.close = true;
+    }
+
+    private class BootstrapListener implements ChannelFutureListener {
         private final Bootstrap bootstrap;
-        private final InetSocketAddress address;
+        private final InetSocketAddress remoteAddress;
+        private final SocketAddress localAddress;
         private long delay;
+        private Timer timer = new Timer();
 
-        BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress address) {
+        BootstrapListener(final Bootstrap bootstrap, final SocketAddress localAddress,
+                final InetSocketAddress remoteAddress) {
             this.bootstrap = bootstrap;
-            this.address = address;
+            this.remoteAddress = remoteAddress;
+            this.localAddress = localAddress;
             this.delay = INITIAL_BACKOFF;
         }
 
         @Override
-        public void operationComplete(final ChannelFuture cf) throws Exception {
-            if (cf.isCancelled()) {
-                LOG.debug("Connection {} cancelled!", cf);
-            } else if (cf.isSuccess()) {
-                LOG.debug("Connection {} succeeded!", cf);
+        public void operationComplete(final ChannelFuture future) throws Exception {
+            if (future.isCancelled()) {
+                LOG.debug("Connection {} cancelled!", future);
+            } else if (future.isSuccess()) {
+                LOG.debug("Connection {} succeeded!", future);
+                addCloseDetectListener(future.channel());
             } else {
-                final EventLoop loop = cf.channel().eventLoop();
+                final EventLoop loop = future.channel().eventLoop();
                 loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
                 LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
-                        this.address, this.delay);
+                        this.remoteAddress, this.delay);
+            }
+        }
+
+        private void addCloseDetectListener(Channel channel) {
+            //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
+            channel.closeFuture().addListener((ChannelFutureListener) future -> scheduleConnect());
+        }
+
+        private void scheduleConnect() {
+            if (!BmpMockDispatcher.this.close) {
+
+                timer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        createClient(BootstrapListener.this.localAddress,
+                                BmpMockDispatcher.BootstrapListener.this.remoteAddress);
+                    }
+                }, (long) 5);
             }
         }
     }