BGPCEP-737: Implement BMP client reconnection 14/66714/3
authorClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Thu, 21 Dec 2017 10:46:36 +0000 (11:46 +0100)
committerClaudio David Gasparini <claudio.gasparini@pantheon.tech>
Fri, 22 Dec 2017 10:38:54 +0000 (10:38 +0000)
after succesful connection goes down.
Remove duplicate code.

Change-Id: I8690de7d6a49c6c92e319c840a37a1fe043b9775
Signed-off-by: Claudio D. Gasparini <claudio.gasparini@pantheon.tech>
12 files changed:
bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpDispatcherImpl.java
bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpDispatcherUtil.java [new file with mode: 0644]
bmp/bmp-impl/src/test/java/org/opendaylight/protocol/bmp/impl/app/BmpMonitorImplTest.java
bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMock.java
bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockDispatcher.java
bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSessionFactory.java [new file with mode: 0644]
bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSessionListener.java [new file with mode: 0644]
bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSessionListenerFactory.java [new file with mode: 0644]
bmp/bmp-mock/src/test/java/org/opendaylight/protocol/bmp/mock/BmpMockDispatcherTest.java
bmp/bmp-mock/src/test/java/org/opendaylight/protocol/bmp/mock/BmpMockTest.java
bmp/bmp-spi/src/main/java/org/opendaylight/protocol/bmp/api/BmpSessionFactory.java
bmp/bmp-spi/src/main/java/org/opendaylight/protocol/bmp/api/BmpSessionListenerFactory.java

index 94df95ccf2919e1b4cabd7ead1da0136fb15a12b..6fdb666d48b6f9def2416aa7b39a9994e9da0ea1 100644 (file)
@@ -9,27 +9,23 @@
 package org.opendaylight.protocol.bmp.impl;
 
 import static java.util.Objects.requireNonNull;
+import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createClientBootstrap;
+import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createServerBootstrap;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AbstractChannel;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollChannelOption;
 import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.bmp.api.BmpDispatcher;
 import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
 import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
@@ -42,8 +38,6 @@ public class BmpDispatcherImpl implements BmpDispatcher {
 
     private static final Logger LOG = LoggerFactory.getLogger(BmpDispatcherImpl.class);
 
-    private static final int MAX_CONNECTIONS_COUNT = 128;
-
     private static final int CONNECT_TIMEOUT = 5000;
     private static final int INITIAL_BACKOFF = 30_000;
     private static final int MAXIMUM_BACKOFF = 720_000;
@@ -53,6 +47,8 @@ public class BmpDispatcherImpl implements BmpDispatcher {
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final BmpSessionFactory sessionFactory;
+    @GuardedBy("this")
+    private boolean close;
 
     public BmpDispatcherImpl(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup,
             final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
@@ -68,83 +64,30 @@ public class BmpDispatcherImpl implements BmpDispatcher {
     }
 
     @Override
-    public ChannelFuture createClient(final InetSocketAddress address, final BmpSessionListenerFactory slf,
-        final KeyMapping keys) {
-
-        final Bootstrap b = new Bootstrap();
-
-        requireNonNull(address);
-
-        if (Epoll.isAvailable()) {
-            b.channel(EpollSocketChannel.class);
-        } else {
-            b.channel(NioSocketChannel.class);
-        }
-        if (!keys.isEmpty()) {
-            if (Epoll.isAvailable()) {
-                b.option(EpollChannelOption.TCP_MD5SIG, keys);
-            } else {
-                throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
-            }
-        }
-        b.option(ChannelOption.SO_KEEPALIVE, true);
-        b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
-        b.group(this.workerGroup);
-
-        b.handler(new ChannelInitializer<AbstractChannel>() {
-            @Override
-            protected void initChannel(final AbstractChannel ch) throws Exception {
-                ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
-                ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
-            }
-        });
-
-        b.remoteAddress(address);
-        final ChannelFuture channelPromise = b.connect();
-        channelPromise.addListener(new BmpDispatcherImpl.BootstrapListener(b, address));
+    public ChannelFuture createClient(final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf,
+            final KeyMapping keys) {
+        final Bootstrap bootstrap = createClientBootstrap(this.sessionFactory, this.hf,
+                BmpDispatcherUtil::createChannelWithDecoder, slf, remoteAddress, this.workerGroup,
+                CONNECT_TIMEOUT, keys);
+        final ChannelFuture channelPromise = bootstrap.connect();
+        channelPromise.addListener(new BootstrapListener(channelPromise, bootstrap, remoteAddress, slf, keys));
+        LOG.debug("Initiated BMP Client {} at {}.", channelPromise, remoteAddress);
         return channelPromise;
     }
 
     @Override
     public ChannelFuture createServer(final InetSocketAddress address, final BmpSessionListenerFactory slf,
-        final KeyMapping keys) {
-        requireNonNull(address);
-        requireNonNull(slf);
-
-        final ServerBootstrap b = new ServerBootstrap();
-        b.childHandler(new ChannelInitializer<Channel>() {
-            @Override
-            protected void initChannel(final Channel ch) throws Exception {
-                ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
-                ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
-            }
-        });
-
-        b.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
-        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
-        if (Epoll.isAvailable()) {
-            b.channel(EpollServerSocketChannel.class);
-        } else {
-            b.channel(NioServerSocketChannel.class);
-        }
-
-        if (!keys.isEmpty()) {
-            if (Epoll.isAvailable()) {
-                b.option(EpollChannelOption.TCP_MD5SIG, keys);
-            } else {
-                throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
-            }
-        }
-        b.group(this.bossGroup, this.workerGroup);
-        final ChannelFuture f = b.bind(address);
-
-        LOG.debug("Initiated BMP server {} at {}.", f, address);
-        return f;
+            final KeyMapping keys) {
+        final ServerBootstrap serverBootstrap = createServerBootstrap(this.sessionFactory, this.hf, slf,
+                BmpDispatcherUtil::createChannelWithDecoder, this.bossGroup, this.workerGroup, keys);
+        final ChannelFuture channelFuture = serverBootstrap.bind(address);
+        LOG.debug("Initiated BMP server {} at {}.", channelFuture, address);
+        return channelFuture;
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
+        this.close = true;
         if (Epoll.isAvailable()) {
             this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
             this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
@@ -152,38 +95,59 @@ public class BmpDispatcherImpl implements BmpDispatcher {
     }
 
     private class BootstrapListener implements ChannelFutureListener {
-
         private final Bootstrap bootstrap;
-
+        private final InetSocketAddress remoteAddress;
+        private final BmpSessionListenerFactory slf;
+        private final KeyMapping keys;
         private long delay;
+        private Timer timer = new Timer();
 
-        private final InetSocketAddress address;
-
-        public BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress address) {
+        BootstrapListener(final ChannelFuture channelPromise, final Bootstrap bootstrap,
+                final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf, final KeyMapping keys) {
             this.bootstrap = bootstrap;
-            this.address = address;
+            this.remoteAddress = remoteAddress;
             this.delay = INITIAL_BACKOFF;
+            this.slf = slf;
+            this.keys = keys;
         }
 
         @Override
-        public void operationComplete(final ChannelFuture cf) throws Exception {
-            if (cf.isCancelled()) {
-                LOG.debug("Connection {} cancelled!", cf);
-            } else if (cf.isSuccess()) {
-                LOG.debug("Connection {} succeeded!", cf);
+        public void operationComplete(final ChannelFuture future) throws Exception {
+            if (future.isCancelled()) {
+                LOG.debug("Connection {} cancelled!", future);
+            } else if (future.isSuccess()) {
+                LOG.debug("Connection {} succeeded!", future);
+                addCloseDetectListener(future.channel());
             } else {
                 if (this.delay > MAXIMUM_BACKOFF) {
                     LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP " +
-                        "router {}.", this.address);
-                    cf.cancel(false);
+                            "router {}.", this.remoteAddress);
+                    future.cancel(false);
                     return;
                 }
-                final EventLoop loop = cf.channel().eventLoop();
+                final EventLoop loop = future.channel().eventLoop();
                 loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
                 LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
-                    this.address, this.delay);
+                        this.remoteAddress, this.delay);
                 this.delay *= 2;
             }
         }
+
+        private void addCloseDetectListener(Channel channel) {
+            //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
+            channel.closeFuture().addListener((ChannelFutureListener) future -> scheduleConnect());
+        }
+
+        private void scheduleConnect() {
+            if (!BmpDispatcherImpl.this.close) {
+                timer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        createClient(BootstrapListener.this.remoteAddress, BootstrapListener.this.slf,
+                                BootstrapListener.this.keys);
+                    }
+                }, (long) 5);
+            }
+        }
     }
 }
diff --git a/bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpDispatcherUtil.java b/bmp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/BmpDispatcherUtil.java
new file mode 100644 (file)
index 0000000..ea57315
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bmp.impl;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
+import org.opendaylight.protocol.concepts.KeyMapping;
+
+public final class BmpDispatcherUtil {
+    private static final int MAX_CONNECTIONS_COUNT = 128;
+
+    private BmpDispatcherUtil() {
+        throw new UnsupportedOperationException();
+    }
+
+    public static ChannelInitializer<AbstractChannel> createChannelWithDecoder(
+            @Nonnull final BmpSessionFactory sessionFactory,
+            @Nonnull final BmpHandlerFactory hf,
+            @Nullable final BmpSessionListenerFactory slf) {
+        return new ChannelInitializer<AbstractChannel>() {
+            @Override
+            protected void initChannel(final AbstractChannel ch) throws Exception {
+                ch.pipeline().addLast(hf.getDecoders());
+                ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
+            }
+        };
+    }
+
+    public static ChannelInitializer<AbstractChannel> createChannelWithEncoder(
+            @Nonnull final BmpSessionFactory sessionFactory,
+            @Nonnull final BmpHandlerFactory hf,
+            @Nullable final BmpSessionListenerFactory slf) {
+        return new ChannelInitializer<AbstractChannel>() {
+            @Override
+            protected void initChannel(final AbstractChannel ch) throws Exception {
+                ch.pipeline().addLast(hf.getEncoders());
+                ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
+            }
+        };
+    }
+
+    /**
+     * To be used by BMP Dispatcher mainly.
+     */
+    public static ServerBootstrap createServerBootstrap(
+            @Nonnull final BmpSessionFactory sessionFactory,
+            @Nonnull final BmpHandlerFactory hf,
+            @Nullable final BmpSessionListenerFactory slf,
+            @Nonnull CreateChannel createChannel,
+            @Nonnull final EventLoopGroup bossGroup,
+            @Nonnull final EventLoopGroup workerGroup,
+            @Nonnull final KeyMapping keys) {
+        return createServerBootstrap(sessionFactory, hf, slf, createChannel, bossGroup, workerGroup, keys,
+                true);
+    }
+
+    public static ServerBootstrap createServerBootstrap(
+            @Nonnull final BmpSessionFactory sessionFactory,
+            @Nonnull final BmpHandlerFactory hf,
+            @Nullable final BmpSessionListenerFactory slf,
+            @Nonnull CreateChannel createChannel,
+            @Nonnull final EventLoopGroup bossGroup,
+            @Nonnull final EventLoopGroup workerGroup,
+            @Nonnull final KeyMapping keys,
+            boolean tryEpollSocket) {
+
+        final ServerBootstrap serverBootstrap = new ServerBootstrap();
+        serverBootstrap.childHandler(createChannel.create(sessionFactory, hf, slf));
+        serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
+        serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        serverBootstrap.group(bossGroup, workerGroup);
+
+        if (!tryEpollSocket) {
+            serverBootstrap.channel(NioServerSocketChannel.class);
+        } else {
+            if (Epoll.isAvailable()) {
+                serverBootstrap.channel(EpollServerSocketChannel.class);
+            } else {
+                serverBootstrap.channel(NioServerSocketChannel.class);
+            }
+
+            if (!keys.isEmpty()) {
+                if (Epoll.isAvailable()) {
+                    serverBootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
+                } else {
+                    throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
+                }
+            }
+        }
+
+        return serverBootstrap;
+    }
+
+    /**
+     * To be used by BMP Dispatcher mainly.
+     */
+    public static Bootstrap createClientBootstrap(
+            @Nonnull final BmpSessionFactory sessionFactory,
+            @Nonnull final BmpHandlerFactory hf,
+            @Nonnull CreateChannel createChannel,
+            @Nonnull final BmpSessionListenerFactory slf,
+            @Nonnull final InetSocketAddress remoteAddress,
+            @Nonnull final EventLoopGroup workerGroup,
+            final int connectTimeout,
+            @Nonnull final KeyMapping keys) {
+        return createClientBootstrap(sessionFactory, hf, createChannel, slf, remoteAddress, null,
+                workerGroup, connectTimeout, keys, false, true);
+    }
+
+    public static Bootstrap createClientBootstrap(
+            @Nonnull final BmpSessionFactory sessionFactory,
+            @Nonnull final BmpHandlerFactory hf,
+            @Nonnull CreateChannel createChannel,
+            @Nonnull final BmpSessionListenerFactory slf,
+            @Nonnull final InetSocketAddress remoteAddress,
+            @Nullable final SocketAddress localAddress,
+            @Nonnull final EventLoopGroup workerGroup,
+            final int connectTimeout,
+            @Nonnull final KeyMapping keys,
+            boolean reuseAddress,
+            boolean tryEpollSocket) {
+        final Bootstrap bootstrap = new Bootstrap();
+        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
+        bootstrap.group(workerGroup);
+        bootstrap.handler(createChannel.create(sessionFactory, hf, slf));
+        if (localAddress != null) {
+            bootstrap.localAddress(localAddress);
+        }
+        bootstrap.remoteAddress(remoteAddress);
+
+        if (!tryEpollSocket) {
+            bootstrap.channel(NioSocketChannel.class);
+
+        } else {
+            if (Epoll.isAvailable()) {
+                bootstrap.channel(EpollSocketChannel.class);
+            } else {
+                bootstrap.channel(NioSocketChannel.class);
+            }
+            if (!keys.isEmpty()) {
+                if (Epoll.isAvailable()) {
+                    bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
+                } else {
+                    throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
+                }
+            }
+        }
+        return bootstrap;
+    }
+
+    @FunctionalInterface
+    public interface CreateChannel {
+        ChannelInitializer<AbstractChannel> create(
+                @Nonnull final BmpSessionFactory sessionFactory,
+                @Nonnull final BmpHandlerFactory hf,
+                @Nullable final BmpSessionListenerFactory slf);
+
+    }
+}
index 2453cd52602cede7d68fd2b8c445d2572bf8b224..2f4e9a9d2d25ee929cb5218ec3b10e7c7a81d028 100644 (file)
@@ -308,7 +308,8 @@ public class BmpMonitorImplTest extends AbstractConcurrentDataBrokerTest {
                 return router;
             });
 
-            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil
+                    .createInitMsg("description", "name", "some info")));
 
             readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
                 assertFalse(monitor.getRouter().isEmpty());
index d3a7083e72bbc5cbe40a76c5bd518a59959497d5..8562c2859e575c8af884861aad19818d749144e5 100644 (file)
@@ -53,9 +53,7 @@ public final class BmpMock {
         final BmpExtensionProviderActivator bmpActivator = new BmpActivator(bgpCtx);
         bmpActivator.start(ctx);
 
-        return new BmpMockDispatcher(ctx.getBmpMessageRegistry(), (channel, sessionListenerFactory) ->
-                new BmpMockSession(arguments.getPeersCount(),
-                        arguments.getPrePolicyRoutesCount(), arguments.getPostPolicyRoutesCount()));
+        return new BmpMockDispatcher(ctx.getBmpMessageRegistry(), new BmpMockSessionFactory(arguments));
     }
 
     private static void deployClients(final BmpMockDispatcher dispatcher, final BmpMockArguments arguments) {
index 2caae84ab6878e6f5c17cbdef98cd0cb8711d495..8539ab37899d104c23446739d831113c250322c3 100644 (file)
@@ -12,125 +12,120 @@ import static java.util.Objects.requireNonNull;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil;
 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
+import org.opendaylight.protocol.concepts.KeyMapping;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class BmpMockDispatcher {
+final class BmpMockDispatcher implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(BmpMockDispatcher.class);
     private static final int CONNECT_TIMEOUT = 2000;
-    private static final int MAX_CONNECTIONS_COUNT = 128;
     private static final int INITIAL_BACKOFF = 15_000;
+    private static final KeyMapping KEY_MAPPING = KeyMapping.getKeyMapping();
     private final BmpHandlerFactory hf;
     private final BmpSessionFactory sessionFactory;
 
     private final EventLoopGroup bossGroup = new NioEventLoopGroup();
     private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+    private final BmpMockSessionListenerFactory slf;
+    @GuardedBy("this")
+    private boolean close;
 
     BmpMockDispatcher(final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
         this.sessionFactory = requireNonNull(sessionFactory);
+        this.slf = new BmpMockSessionListenerFactory();
         requireNonNull(registry);
         this.hf = new BmpHandlerFactory(registry);
     }
 
-    private Bootstrap createClientInstance(final SocketAddress localAddress) {
-        final NioEventLoopGroup workergroup = new NioEventLoopGroup();
-        final Bootstrap bootstrap = new Bootstrap();
-
-        bootstrap.channel(NioSocketChannel.class);
-        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
-        bootstrap.option(ChannelOption.SO_REUSEADDR, true);
-        bootstrap.group(workergroup);
-
-        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
-            @Override
-            protected void initChannel(final NioSocketChannel ch) throws Exception {
-                ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
-                ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
-            }
-        });
-        bootstrap.localAddress(localAddress);
-        return bootstrap;
-    }
-
-    ChannelFuture createClient(final SocketAddress localAddress, final InetSocketAddress remoteAddress) {
-        requireNonNull(localAddress);
-        requireNonNull(remoteAddress);
-
-        // ideally we should use Bootstrap clones here
-        final Bootstrap bootstrap = createClientInstance(localAddress);
-        bootstrap.remoteAddress(remoteAddress);
+    ChannelFuture createClient(@Nonnull final SocketAddress localAddress,
+            @Nonnull final InetSocketAddress remoteAddress) {
+        final Bootstrap bootstrap = BmpDispatcherUtil.createClientBootstrap(this.sessionFactory, this.hf,
+                BmpDispatcherUtil::createChannelWithEncoder, this.slf, remoteAddress, localAddress, this.workerGroup,
+                CONNECT_TIMEOUT, KEY_MAPPING, true, false);
         final ChannelFuture channelFuture = bootstrap.connect(remoteAddress);
         LOG.info("BMP client {} <--> {} deployed", localAddress, remoteAddress);
-        channelFuture.addListener(new BootstrapListener(bootstrap, remoteAddress));
+        channelFuture.addListener(new BootstrapListener(channelFuture, bootstrap, localAddress, remoteAddress));
         return channelFuture;
     }
 
-    private ServerBootstrap createServerInstance() {
-        final ServerBootstrap serverBootstrap = new ServerBootstrap();
-        serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
-            @Override
-            protected void initChannel(final Channel ch) throws Exception {
-                ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
-                ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
-            }
-        });
-
-        serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
-        serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-        serverBootstrap.channel(NioServerSocketChannel.class);
-        serverBootstrap.group(this.bossGroup, this.workerGroup);
-        return serverBootstrap;
-    }
-
     ChannelFuture createServer(final InetSocketAddress localAddress) {
         requireNonNull(localAddress);
-        final ServerBootstrap serverBootstrap = createServerInstance();
+        final ServerBootstrap serverBootstrap = BmpDispatcherUtil.createServerBootstrap(this.sessionFactory,
+                this.hf, this.slf, BmpDispatcherUtil::createChannelWithEncoder,
+                this.bossGroup, this.workerGroup, KEY_MAPPING, false);
         final ChannelFuture channelFuture = serverBootstrap.bind(localAddress);
         LOG.info("Initiated BMP server at {}.", localAddress);
         return channelFuture;
     }
 
-    private static class BootstrapListener implements ChannelFutureListener {
+    @Override
+    public synchronized void close() {
+        this.close = true;
+    }
+
+    private class BootstrapListener implements ChannelFutureListener {
         private final Bootstrap bootstrap;
-        private final InetSocketAddress address;
+        private final InetSocketAddress remoteAddress;
+        private final SocketAddress localAddress;
         private long delay;
+        private Timer timer = new Timer();
 
-        BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress address) {
+        BootstrapListener(final ChannelFuture channelPromise, final Bootstrap bootstrap,
+                final SocketAddress localAddress, final InetSocketAddress remoteAddress) {
             this.bootstrap = bootstrap;
-            this.address = address;
+            this.remoteAddress = remoteAddress;
+            this.localAddress = localAddress;
             this.delay = INITIAL_BACKOFF;
         }
 
         @Override
-        public void operationComplete(final ChannelFuture cf) throws Exception {
-            if (cf.isCancelled()) {
-                LOG.debug("Connection {} cancelled!", cf);
-            } else if (cf.isSuccess()) {
-                LOG.debug("Connection {} succeeded!", cf);
+        public void operationComplete(final ChannelFuture future) throws Exception {
+            if (future.isCancelled()) {
+                LOG.debug("Connection {} cancelled!", future);
+            } else if (future.isSuccess()) {
+                LOG.debug("Connection {} succeeded!", future);
+                addCloseDetectListener(future.channel());
             } else {
-                final EventLoop loop = cf.channel().eventLoop();
+                final EventLoop loop = future.channel().eventLoop();
                 loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
                 LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
-                        this.address, this.delay);
+                        this.remoteAddress, this.delay);
+            }
+        }
+
+        private void addCloseDetectListener(Channel channel) {
+            //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
+            channel.closeFuture().addListener((ChannelFutureListener) future -> scheduleConnect());
+        }
+
+        private void scheduleConnect() {
+            if (!BmpMockDispatcher.this.close) {
+
+                timer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        createClient(BootstrapListener.this.localAddress,
+                                BmpMockDispatcher.BootstrapListener.this.remoteAddress);
+                    }
+                }, (long) 5);
             }
         }
     }
diff --git a/bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSessionFactory.java b/bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSessionFactory.java
new file mode 100644 (file)
index 0000000..cac91ec
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bmp.mock;
+
+import io.netty.channel.Channel;
+import org.opendaylight.protocol.bmp.api.BmpSession;
+import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
+
+public final class BmpMockSessionFactory implements BmpSessionFactory {
+    private final BmpMockArguments arguments;
+
+    public BmpMockSessionFactory(final BmpMockArguments arguments) {
+        this.arguments = arguments;
+    }
+
+    @Override
+    public BmpSession getSession(final Channel channel, final BmpSessionListenerFactory sessionListenerFactory) {
+        return new BmpMockSession(this.arguments.getPeersCount(),
+                this.arguments.getPrePolicyRoutesCount(), this.arguments.getPostPolicyRoutesCount());
+    }
+}
diff --git a/bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSessionListener.java b/bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSessionListener.java
new file mode 100644 (file)
index 0000000..8e7fe80
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bmp.mock;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.protocol.bmp.api.BmpSession;
+import org.opendaylight.protocol.bmp.api.BmpSessionListener;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+public final class BmpMockSessionListener implements BmpSessionListener {
+    private final LongAdder counter = new LongAdder();
+    @GuardedBy("this")
+    private AtomicBoolean up = new AtomicBoolean(false);
+
+    @Override
+    public void onSessionUp(final BmpSession session) {
+        this.up.set(true);
+    }
+
+    @Override
+    public void onSessionDown(final Exception exception) {
+        this.up.set(false);
+    }
+
+    @Override
+    public void onMessage(final Notification message) {
+        this.counter.increment();
+    }
+
+    public boolean getStatus() {
+        return this.up.get();
+    }
+
+    public long getNumberOfMessagesReceived() {
+        return this.counter.longValue();
+    }
+}
diff --git a/bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSessionListenerFactory.java b/bmp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSessionListenerFactory.java
new file mode 100644 (file)
index 0000000..0af6222
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bmp.mock;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.protocol.bmp.api.BmpSessionListener;
+import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
+
+public final class BmpMockSessionListenerFactory implements BmpSessionListenerFactory {
+    @Nonnull
+    @Override
+    public BmpSessionListener getSessionListener() {
+        return new BmpMockSessionListener();
+    }
+}
index bce3b06a65d4f67ee11b8254fe084689561cd6bc..cc2c37583273cd85af731f62774b88ce67a727cd 100644 (file)
 
 package org.opendaylight.protocol.bmp.mock;
 
-import static org.opendaylight.protocol.bmp.mock.BmpMockTest.waitFutureComplete;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.opendaylight.protocol.util.CheckUtil.checkEquals;
+import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
 
 import com.google.common.net.InetAddresses;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
 import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
+import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
 import org.opendaylight.protocol.concepts.KeyMapping;
 import org.opendaylight.protocol.util.InetSocketAddressUtil;
 
 public class BmpMockDispatcherTest {
 
-    private final BmpMessageRegistry registry = Mockito.mock(BmpMessageRegistry.class);
-    private final BmpSessionFactory sessionFactory = Mockito.mock(BmpSessionFactory.class);
-    private final BmpSessionListenerFactory slf = Mockito.mock(BmpSessionListenerFactory.class);
+    private final BmpSessionFactory sessionFactory = new DefaultBmpSessionFactory();
+    private final BmpMockSessionListener sl = new BmpMockSessionListener();
+    @Mock
+    private BmpMessageRegistry registry;
+    @Mock
+    private BmpSessionListenerFactory slf;
+    private BmpMockDispatcher bmpMockDispatcher;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        doReturn(this.sl).when(this.slf).getSessionListener();
+        this.bmpMockDispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
+    }
 
     @Test
-    public void testCreateClient() throws InterruptedException {
-        final BmpMockDispatcher dispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
+    public void testCreateClient() throws Exception {
         final int port = InetSocketAddressUtil.getRandomPort();
         final InetSocketAddress serverAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
-        final BmpDispatcherImpl serverDispatcher = new BmpDispatcherImpl(
+
+        final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
                 new NioEventLoopGroup(), new NioEventLoopGroup(), this.registry, this.sessionFactory);
-        final ChannelFuture futureServer = serverDispatcher
+        final ChannelFuture futureServer = bmpDispatcher
                 .createServer(serverAddr, this.slf, KeyMapping.getKeyMapping());
-        waitFutureComplete(futureServer);
-        final ChannelFuture channelFuture = dispatcher.createClient(InetSocketAddressUtil
+        waitFutureSuccess(futureServer);
+
+        final ChannelFuture channelFuture = this.bmpMockDispatcher.createClient(InetSocketAddressUtil
                 .getRandomLoopbackInetSocketAddress(0), serverAddr);
-        waitFutureComplete(channelFuture);
+        waitFutureSuccess(channelFuture);
         final Channel channel = channelFuture.sync().channel();
 
-        Assert.assertTrue(channel.isActive());
+        assertTrue(channel.isActive());
+        checkEquals(() -> assertTrue(this.sl.getStatus()));
         channel.close();
-        serverDispatcher.close();
+        bmpDispatcher.close();
+        checkEquals(() -> assertFalse(this.sl.getStatus()));
+
+        final BmpDispatcherImpl bmpDispatcher2 = new BmpDispatcherImpl(
+                new NioEventLoopGroup(), new NioEventLoopGroup(), this.registry, this.sessionFactory);
+        final ChannelFuture futureServer2 = bmpDispatcher2
+                .createServer(serverAddr, this.slf, KeyMapping.getKeyMapping());
+        waitFutureSuccess(futureServer2);
+        checkEquals(() -> assertTrue(this.sl.getStatus()));
+
+        bmpDispatcher2.close();
+        this.bmpMockDispatcher.close();
+        checkEquals(() -> assertFalse(this.sl.getStatus()));
     }
 
     @Test
-    public void testCreateServer() throws InterruptedException {
-        final BmpMockDispatcher dispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
+    public void testCreateServer() throws Exception {
         final int port = InetSocketAddressUtil.getRandomPort();
-        final BmpDispatcherImpl serverDispatcher = new BmpDispatcherImpl(
+        final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
                 new NioEventLoopGroup(), new NioEventLoopGroup(), this.registry, this.sessionFactory);
-        final ChannelFuture futureServer = dispatcher.createServer(
+        final ChannelFuture futureServer = this.bmpMockDispatcher.createServer(
                 new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port));
-        waitFutureComplete(futureServer);
-        final ChannelFuture channelFuture = serverDispatcher.createClient(
+        waitFutureSuccess(futureServer);
+        final ChannelFuture channelFuture = bmpDispatcher.createClient(
                 InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port), this.slf, KeyMapping.getKeyMapping());
-        waitFutureComplete(channelFuture);
+        waitFutureSuccess(channelFuture);
         final Channel channel = channelFuture.sync().channel();
 
-        Assert.assertTrue(channel.isActive());
+        assertTrue(channel.isActive());
+        checkEquals(() -> assertTrue(this.sl.getStatus()));
+        assertTrue(futureServer.channel().isActive());
         channel.close();
-        serverDispatcher.close();
+
+        bmpDispatcher.close();
+        this.bmpMockDispatcher.close();
+        checkEquals(() -> assertFalse(this.sl.getStatus()));
     }
 
 }
index a846ee7d32d9802799e772c10117b67998263420..64cdc3db12b4ac5f137de962f6f56bfb40e115e0 100644 (file)
@@ -8,12 +8,13 @@
 
 package org.opendaylight.protocol.bmp.mock;
 
-import com.google.common.util.concurrent.Uninterruptibles;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
+
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
@@ -44,11 +45,11 @@ public class BmpMockTest {
     public void setUp() {
         final BmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
         this.bmpActivator = new BmpActivator(
-                ServiceLoaderBGPExtensionProviderContext.getSingletonInstance());
+            ServiceLoaderBGPExtensionProviderContext.getSingletonInstance());
         this.bmpActivator.start(ctx);
         this.bmpDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
-                ctx.getBmpMessageRegistry(),
-                new DefaultBmpSessionFactory());
+            ctx.getBmpMessageRegistry(),
+            new DefaultBmpSessionFactory());
     }
 
     @After
@@ -63,7 +64,7 @@ public class BmpMockTest {
         final BmpSessionListenerFactory bmpSessionListenerFactory = () -> BmpMockTest.this.sessionListener;
         final ChannelFuture futureServer = this.bmpDispatcher.createServer(serverAddr,
                 bmpSessionListenerFactory, KeyMapping.getKeyMapping());
-        waitFutureComplete(futureServer);
+        waitFutureSuccess(futureServer);
         final Channel serverChannel;
         final int sessionUpWait;
         if (futureServer.isSuccess()) {
@@ -82,12 +83,12 @@ public class BmpMockTest {
             "--pre_policy_routes",
             "3"});
 
-        Mockito.verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
+        verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
                 .onSessionUp(Mockito.any(BmpSession.class));
         //1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
-        Mockito.verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
-                .times(13))
-                .onMessage(Mockito.any(Notification.class));
+        verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
+            .times(13))
+            .onMessage(Mockito.any(Notification.class));
 
         if (serverChannel != null) {
             serverChannel.close().sync();
@@ -104,7 +105,7 @@ public class BmpMockTest {
             "--peers_count", "3", "--pre_policy_routes", "3", "--passive"});
         final ChannelFuture futureServer = this.bmpDispatcher.createClient(serverAddr,
                 bmpSessionListenerFactory, KeyMapping.getKeyMapping());
-        waitFutureComplete(futureServer);
+        waitFutureSuccess(futureServer);
         final Channel serverChannel;
         final int sessionUpWait;
         if (futureServer.isSuccess()) {
@@ -116,21 +117,15 @@ public class BmpMockTest {
             sessionUpWait = 40;
         }
 
-        Mockito.verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
-                .onSessionUp(Mockito.any(BmpSession.class));
+        verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
+            .onSessionUp(Mockito.any(BmpSession.class));
         //1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
-        Mockito.verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
-                .times(13))
-                .onMessage(Mockito.any(Notification.class));
+        verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
+            .times(13))
+            .onMessage(Mockito.any(Notification.class));
 
         if (serverChannel != null) {
             serverChannel.close().sync();
         }
     }
-
-    static void waitFutureComplete(final ChannelFuture future) throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        future.addListener(future1 -> latch.countDown());
-        Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
-    }
 }
index fe5950f07f10164b011a3da39bd05961851c74ea..7345e5f03fac39b54383c74f8fb0c72701c41b72 100644 (file)
@@ -9,9 +9,17 @@
 package org.opendaylight.protocol.bmp.api;
 
 import io.netty.channel.Channel;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 public interface BmpSessionFactory {
-
-    BmpSession getSession(Channel channel, BmpSessionListenerFactory sessionListenerFactory);
-
+    /**
+     * Creates Bmp Session.
+     *
+     * @param channel                generated channel
+     * @param sessionListenerFactory listener factory
+     * @return bmp session
+     */
+    @Nonnull BmpSession getSession(@Nonnull Channel channel,
+            @Nullable BmpSessionListenerFactory sessionListenerFactory);
 }
index 034a27d3e3194a149a1cb4d1b630392aeb88e821..52744f18f7efd54284e8115d460ad8feec4af0d6 100644 (file)
@@ -8,8 +8,11 @@
 
 package org.opendaylight.protocol.bmp.api;
 
+import javax.annotation.Nonnull;
+
 public interface BmpSessionListenerFactory {
 
+    @Nonnull
     BmpSessionListener getSessionListener();
 
 }