after succesful connection goes down.
Remove duplicate code.
Change-Id: I8690de7d6a49c6c92e319c840a37a1fe043b9775
Signed-off-by: Claudio D. Gasparini <claudio.gasparini@pantheon.tech>
package org.opendaylight.protocol.bmp.impl;
import static java.util.Objects.requireNonNull;
+import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createClientBootstrap;
+import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createServerBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.bmp.api.BmpDispatcher;
import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
private static final Logger LOG = LoggerFactory.getLogger(BmpDispatcherImpl.class);
- private static final int MAX_CONNECTIONS_COUNT = 128;
-
private static final int CONNECT_TIMEOUT = 5000;
private static final int INITIAL_BACKOFF = 30_000;
private static final int MAXIMUM_BACKOFF = 720_000;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final BmpSessionFactory sessionFactory;
+ @GuardedBy("this")
+ private boolean close;
public BmpDispatcherImpl(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup,
final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
}
@Override
- public ChannelFuture createClient(final InetSocketAddress address, final BmpSessionListenerFactory slf,
- final KeyMapping keys) {
-
- final Bootstrap b = new Bootstrap();
-
- requireNonNull(address);
-
- if (Epoll.isAvailable()) {
- b.channel(EpollSocketChannel.class);
- } else {
- b.channel(NioSocketChannel.class);
- }
- if (!keys.isEmpty()) {
- if (Epoll.isAvailable()) {
- b.option(EpollChannelOption.TCP_MD5SIG, keys);
- } else {
- throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
- }
- }
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
- b.group(this.workerGroup);
-
- b.handler(new ChannelInitializer<AbstractChannel>() {
- @Override
- protected void initChannel(final AbstractChannel ch) throws Exception {
- ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
- ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
- }
- });
-
- b.remoteAddress(address);
- final ChannelFuture channelPromise = b.connect();
- channelPromise.addListener(new BmpDispatcherImpl.BootstrapListener(b, address));
+ public ChannelFuture createClient(final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf,
+ final KeyMapping keys) {
+ final Bootstrap bootstrap = createClientBootstrap(this.sessionFactory, this.hf,
+ BmpDispatcherUtil::createChannelWithDecoder, slf, remoteAddress, this.workerGroup,
+ CONNECT_TIMEOUT, keys);
+ final ChannelFuture channelPromise = bootstrap.connect();
+ channelPromise.addListener(new BootstrapListener(channelPromise, bootstrap, remoteAddress, slf, keys));
+ LOG.debug("Initiated BMP Client {} at {}.", channelPromise, remoteAddress);
return channelPromise;
}
@Override
public ChannelFuture createServer(final InetSocketAddress address, final BmpSessionListenerFactory slf,
- final KeyMapping keys) {
- requireNonNull(address);
- requireNonNull(slf);
-
- final ServerBootstrap b = new ServerBootstrap();
- b.childHandler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(final Channel ch) throws Exception {
- ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
- ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
- }
- });
-
- b.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
- b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
- if (Epoll.isAvailable()) {
- b.channel(EpollServerSocketChannel.class);
- } else {
- b.channel(NioServerSocketChannel.class);
- }
-
- if (!keys.isEmpty()) {
- if (Epoll.isAvailable()) {
- b.option(EpollChannelOption.TCP_MD5SIG, keys);
- } else {
- throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
- }
- }
- b.group(this.bossGroup, this.workerGroup);
- final ChannelFuture f = b.bind(address);
-
- LOG.debug("Initiated BMP server {} at {}.", f, address);
- return f;
+ final KeyMapping keys) {
+ final ServerBootstrap serverBootstrap = createServerBootstrap(this.sessionFactory, this.hf, slf,
+ BmpDispatcherUtil::createChannelWithDecoder, this.bossGroup, this.workerGroup, keys);
+ final ChannelFuture channelFuture = serverBootstrap.bind(address);
+ LOG.debug("Initiated BMP server {} at {}.", channelFuture, address);
+ return channelFuture;
}
@Override
- public void close() {
+ public synchronized void close() {
+ this.close = true;
if (Epoll.isAvailable()) {
this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
}
private class BootstrapListener implements ChannelFutureListener {
-
private final Bootstrap bootstrap;
-
+ private final InetSocketAddress remoteAddress;
+ private final BmpSessionListenerFactory slf;
+ private final KeyMapping keys;
private long delay;
+ private Timer timer = new Timer();
- private final InetSocketAddress address;
-
- public BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress address) {
+ BootstrapListener(final ChannelFuture channelPromise, final Bootstrap bootstrap,
+ final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf, final KeyMapping keys) {
this.bootstrap = bootstrap;
- this.address = address;
+ this.remoteAddress = remoteAddress;
this.delay = INITIAL_BACKOFF;
+ this.slf = slf;
+ this.keys = keys;
}
@Override
- public void operationComplete(final ChannelFuture cf) throws Exception {
- if (cf.isCancelled()) {
- LOG.debug("Connection {} cancelled!", cf);
- } else if (cf.isSuccess()) {
- LOG.debug("Connection {} succeeded!", cf);
+ public void operationComplete(final ChannelFuture future) throws Exception {
+ if (future.isCancelled()) {
+ LOG.debug("Connection {} cancelled!", future);
+ } else if (future.isSuccess()) {
+ LOG.debug("Connection {} succeeded!", future);
+ addCloseDetectListener(future.channel());
} else {
if (this.delay > MAXIMUM_BACKOFF) {
LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP " +
- "router {}.", this.address);
- cf.cancel(false);
+ "router {}.", this.remoteAddress);
+ future.cancel(false);
return;
}
- final EventLoop loop = cf.channel().eventLoop();
+ final EventLoop loop = future.channel().eventLoop();
loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
- this.address, this.delay);
+ this.remoteAddress, this.delay);
this.delay *= 2;
}
}
+
+ private void addCloseDetectListener(Channel channel) {
+ //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
+ channel.closeFuture().addListener((ChannelFutureListener) future -> scheduleConnect());
+ }
+
+ private void scheduleConnect() {
+ if (!BmpDispatcherImpl.this.close) {
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ createClient(BootstrapListener.this.remoteAddress, BootstrapListener.this.slf,
+ BootstrapListener.this.keys);
+ }
+ }, (long) 5);
+ }
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bmp.impl;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
+import org.opendaylight.protocol.concepts.KeyMapping;
+
+public final class BmpDispatcherUtil {
+ private static final int MAX_CONNECTIONS_COUNT = 128;
+
+ private BmpDispatcherUtil() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static ChannelInitializer<AbstractChannel> createChannelWithDecoder(
+ @Nonnull final BmpSessionFactory sessionFactory,
+ @Nonnull final BmpHandlerFactory hf,
+ @Nullable final BmpSessionListenerFactory slf) {
+ return new ChannelInitializer<AbstractChannel>() {
+ @Override
+ protected void initChannel(final AbstractChannel ch) throws Exception {
+ ch.pipeline().addLast(hf.getDecoders());
+ ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
+ }
+ };
+ }
+
+ public static ChannelInitializer<AbstractChannel> createChannelWithEncoder(
+ @Nonnull final BmpSessionFactory sessionFactory,
+ @Nonnull final BmpHandlerFactory hf,
+ @Nullable final BmpSessionListenerFactory slf) {
+ return new ChannelInitializer<AbstractChannel>() {
+ @Override
+ protected void initChannel(final AbstractChannel ch) throws Exception {
+ ch.pipeline().addLast(hf.getEncoders());
+ ch.pipeline().addLast(sessionFactory.getSession(ch, slf));
+ }
+ };
+ }
+
+ /**
+ * To be used by BMP Dispatcher mainly.
+ */
+ public static ServerBootstrap createServerBootstrap(
+ @Nonnull final BmpSessionFactory sessionFactory,
+ @Nonnull final BmpHandlerFactory hf,
+ @Nullable final BmpSessionListenerFactory slf,
+ @Nonnull CreateChannel createChannel,
+ @Nonnull final EventLoopGroup bossGroup,
+ @Nonnull final EventLoopGroup workerGroup,
+ @Nonnull final KeyMapping keys) {
+ return createServerBootstrap(sessionFactory, hf, slf, createChannel, bossGroup, workerGroup, keys,
+ true);
+ }
+
+ public static ServerBootstrap createServerBootstrap(
+ @Nonnull final BmpSessionFactory sessionFactory,
+ @Nonnull final BmpHandlerFactory hf,
+ @Nullable final BmpSessionListenerFactory slf,
+ @Nonnull CreateChannel createChannel,
+ @Nonnull final EventLoopGroup bossGroup,
+ @Nonnull final EventLoopGroup workerGroup,
+ @Nonnull final KeyMapping keys,
+ boolean tryEpollSocket) {
+
+ final ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.childHandler(createChannel.create(sessionFactory, hf, slf));
+ serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
+ serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ serverBootstrap.group(bossGroup, workerGroup);
+
+ if (!tryEpollSocket) {
+ serverBootstrap.channel(NioServerSocketChannel.class);
+ } else {
+ if (Epoll.isAvailable()) {
+ serverBootstrap.channel(EpollServerSocketChannel.class);
+ } else {
+ serverBootstrap.channel(NioServerSocketChannel.class);
+ }
+
+ if (!keys.isEmpty()) {
+ if (Epoll.isAvailable()) {
+ serverBootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
+ } else {
+ throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
+ }
+ }
+ }
+
+ return serverBootstrap;
+ }
+
+ /**
+ * To be used by BMP Dispatcher mainly.
+ */
+ public static Bootstrap createClientBootstrap(
+ @Nonnull final BmpSessionFactory sessionFactory,
+ @Nonnull final BmpHandlerFactory hf,
+ @Nonnull CreateChannel createChannel,
+ @Nonnull final BmpSessionListenerFactory slf,
+ @Nonnull final InetSocketAddress remoteAddress,
+ @Nonnull final EventLoopGroup workerGroup,
+ final int connectTimeout,
+ @Nonnull final KeyMapping keys) {
+ return createClientBootstrap(sessionFactory, hf, createChannel, slf, remoteAddress, null,
+ workerGroup, connectTimeout, keys, false, true);
+ }
+
+ public static Bootstrap createClientBootstrap(
+ @Nonnull final BmpSessionFactory sessionFactory,
+ @Nonnull final BmpHandlerFactory hf,
+ @Nonnull CreateChannel createChannel,
+ @Nonnull final BmpSessionListenerFactory slf,
+ @Nonnull final InetSocketAddress remoteAddress,
+ @Nullable final SocketAddress localAddress,
+ @Nonnull final EventLoopGroup workerGroup,
+ final int connectTimeout,
+ @Nonnull final KeyMapping keys,
+ boolean reuseAddress,
+ boolean tryEpollSocket) {
+ final Bootstrap bootstrap = new Bootstrap();
+ bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
+ bootstrap.group(workerGroup);
+ bootstrap.handler(createChannel.create(sessionFactory, hf, slf));
+ if (localAddress != null) {
+ bootstrap.localAddress(localAddress);
+ }
+ bootstrap.remoteAddress(remoteAddress);
+
+ if (!tryEpollSocket) {
+ bootstrap.channel(NioSocketChannel.class);
+
+ } else {
+ if (Epoll.isAvailable()) {
+ bootstrap.channel(EpollSocketChannel.class);
+ } else {
+ bootstrap.channel(NioSocketChannel.class);
+ }
+ if (!keys.isEmpty()) {
+ if (Epoll.isAvailable()) {
+ bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
+ } else {
+ throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
+ }
+ }
+ }
+ return bootstrap;
+ }
+
+ @FunctionalInterface
+ public interface CreateChannel {
+ ChannelInitializer<AbstractChannel> create(
+ @Nonnull final BmpSessionFactory sessionFactory,
+ @Nonnull final BmpHandlerFactory hf,
+ @Nullable final BmpSessionListenerFactory slf);
+
+ }
+}
return router;
});
- waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
+ waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil
+ .createInitMsg("description", "name", "some info")));
readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
assertFalse(monitor.getRouter().isEmpty());
final BmpExtensionProviderActivator bmpActivator = new BmpActivator(bgpCtx);
bmpActivator.start(ctx);
- return new BmpMockDispatcher(ctx.getBmpMessageRegistry(), (channel, sessionListenerFactory) ->
- new BmpMockSession(arguments.getPeersCount(),
- arguments.getPrePolicyRoutesCount(), arguments.getPostPolicyRoutesCount()));
+ return new BmpMockDispatcher(ctx.getBmpMessageRegistry(), new BmpMockSessionFactory(arguments));
}
private static void deployClients(final BmpMockDispatcher dispatcher, final BmpMockArguments arguments) {
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil;
import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
+import org.opendaylight.protocol.concepts.KeyMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class BmpMockDispatcher {
+final class BmpMockDispatcher implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BmpMockDispatcher.class);
private static final int CONNECT_TIMEOUT = 2000;
- private static final int MAX_CONNECTIONS_COUNT = 128;
private static final int INITIAL_BACKOFF = 15_000;
+ private static final KeyMapping KEY_MAPPING = KeyMapping.getKeyMapping();
private final BmpHandlerFactory hf;
private final BmpSessionFactory sessionFactory;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private final BmpMockSessionListenerFactory slf;
+ @GuardedBy("this")
+ private boolean close;
BmpMockDispatcher(final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
this.sessionFactory = requireNonNull(sessionFactory);
+ this.slf = new BmpMockSessionListenerFactory();
requireNonNull(registry);
this.hf = new BmpHandlerFactory(registry);
}
- private Bootstrap createClientInstance(final SocketAddress localAddress) {
- final NioEventLoopGroup workergroup = new NioEventLoopGroup();
- final Bootstrap bootstrap = new Bootstrap();
-
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
- bootstrap.option(ChannelOption.SO_REUSEADDR, true);
- bootstrap.group(workergroup);
-
- bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
- @Override
- protected void initChannel(final NioSocketChannel ch) throws Exception {
- ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
- ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
- }
- });
- bootstrap.localAddress(localAddress);
- return bootstrap;
- }
-
- ChannelFuture createClient(final SocketAddress localAddress, final InetSocketAddress remoteAddress) {
- requireNonNull(localAddress);
- requireNonNull(remoteAddress);
-
- // ideally we should use Bootstrap clones here
- final Bootstrap bootstrap = createClientInstance(localAddress);
- bootstrap.remoteAddress(remoteAddress);
+ ChannelFuture createClient(@Nonnull final SocketAddress localAddress,
+ @Nonnull final InetSocketAddress remoteAddress) {
+ final Bootstrap bootstrap = BmpDispatcherUtil.createClientBootstrap(this.sessionFactory, this.hf,
+ BmpDispatcherUtil::createChannelWithEncoder, this.slf, remoteAddress, localAddress, this.workerGroup,
+ CONNECT_TIMEOUT, KEY_MAPPING, true, false);
final ChannelFuture channelFuture = bootstrap.connect(remoteAddress);
LOG.info("BMP client {} <--> {} deployed", localAddress, remoteAddress);
- channelFuture.addListener(new BootstrapListener(bootstrap, remoteAddress));
+ channelFuture.addListener(new BootstrapListener(channelFuture, bootstrap, localAddress, remoteAddress));
return channelFuture;
}
- private ServerBootstrap createServerInstance() {
- final ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(final Channel ch) throws Exception {
- ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
- ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
- }
- });
-
- serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
- serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.group(this.bossGroup, this.workerGroup);
- return serverBootstrap;
- }
-
ChannelFuture createServer(final InetSocketAddress localAddress) {
requireNonNull(localAddress);
- final ServerBootstrap serverBootstrap = createServerInstance();
+ final ServerBootstrap serverBootstrap = BmpDispatcherUtil.createServerBootstrap(this.sessionFactory,
+ this.hf, this.slf, BmpDispatcherUtil::createChannelWithEncoder,
+ this.bossGroup, this.workerGroup, KEY_MAPPING, false);
final ChannelFuture channelFuture = serverBootstrap.bind(localAddress);
LOG.info("Initiated BMP server at {}.", localAddress);
return channelFuture;
}
- private static class BootstrapListener implements ChannelFutureListener {
+ @Override
+ public synchronized void close() {
+ this.close = true;
+ }
+
+ private class BootstrapListener implements ChannelFutureListener {
private final Bootstrap bootstrap;
- private final InetSocketAddress address;
+ private final InetSocketAddress remoteAddress;
+ private final SocketAddress localAddress;
private long delay;
+ private Timer timer = new Timer();
- BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress address) {
+ BootstrapListener(final ChannelFuture channelPromise, final Bootstrap bootstrap,
+ final SocketAddress localAddress, final InetSocketAddress remoteAddress) {
this.bootstrap = bootstrap;
- this.address = address;
+ this.remoteAddress = remoteAddress;
+ this.localAddress = localAddress;
this.delay = INITIAL_BACKOFF;
}
@Override
- public void operationComplete(final ChannelFuture cf) throws Exception {
- if (cf.isCancelled()) {
- LOG.debug("Connection {} cancelled!", cf);
- } else if (cf.isSuccess()) {
- LOG.debug("Connection {} succeeded!", cf);
+ public void operationComplete(final ChannelFuture future) throws Exception {
+ if (future.isCancelled()) {
+ LOG.debug("Connection {} cancelled!", future);
+ } else if (future.isSuccess()) {
+ LOG.debug("Connection {} succeeded!", future);
+ addCloseDetectListener(future.channel());
} else {
- final EventLoop loop = cf.channel().eventLoop();
+ final EventLoop loop = future.channel().eventLoop();
loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
- this.address, this.delay);
+ this.remoteAddress, this.delay);
+ }
+ }
+
+ private void addCloseDetectListener(Channel channel) {
+ //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
+ channel.closeFuture().addListener((ChannelFutureListener) future -> scheduleConnect());
+ }
+
+ private void scheduleConnect() {
+ if (!BmpMockDispatcher.this.close) {
+
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ createClient(BootstrapListener.this.localAddress,
+ BmpMockDispatcher.BootstrapListener.this.remoteAddress);
+ }
+ }, (long) 5);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bmp.mock;
+
+import io.netty.channel.Channel;
+import org.opendaylight.protocol.bmp.api.BmpSession;
+import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
+
+public final class BmpMockSessionFactory implements BmpSessionFactory {
+ private final BmpMockArguments arguments;
+
+ public BmpMockSessionFactory(final BmpMockArguments arguments) {
+ this.arguments = arguments;
+ }
+
+ @Override
+ public BmpSession getSession(final Channel channel, final BmpSessionListenerFactory sessionListenerFactory) {
+ return new BmpMockSession(this.arguments.getPeersCount(),
+ this.arguments.getPrePolicyRoutesCount(), this.arguments.getPostPolicyRoutesCount());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bmp.mock;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.protocol.bmp.api.BmpSession;
+import org.opendaylight.protocol.bmp.api.BmpSessionListener;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+public final class BmpMockSessionListener implements BmpSessionListener {
+ private final LongAdder counter = new LongAdder();
+ @GuardedBy("this")
+ private AtomicBoolean up = new AtomicBoolean(false);
+
+ @Override
+ public void onSessionUp(final BmpSession session) {
+ this.up.set(true);
+ }
+
+ @Override
+ public void onSessionDown(final Exception exception) {
+ this.up.set(false);
+ }
+
+ @Override
+ public void onMessage(final Notification message) {
+ this.counter.increment();
+ }
+
+ public boolean getStatus() {
+ return this.up.get();
+ }
+
+ public long getNumberOfMessagesReceived() {
+ return this.counter.longValue();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bmp.mock;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.protocol.bmp.api.BmpSessionListener;
+import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
+
+public final class BmpMockSessionListenerFactory implements BmpSessionListenerFactory {
+ @Nonnull
+ @Override
+ public BmpSessionListener getSessionListener() {
+ return new BmpMockSessionListener();
+ }
+}
package org.opendaylight.protocol.bmp.mock;
-import static org.opendaylight.protocol.bmp.mock.BmpMockTest.waitFutureComplete;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.opendaylight.protocol.util.CheckUtil.checkEquals;
+import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
import com.google.common.net.InetAddresses;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
+import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
import org.opendaylight.protocol.concepts.KeyMapping;
import org.opendaylight.protocol.util.InetSocketAddressUtil;
public class BmpMockDispatcherTest {
- private final BmpMessageRegistry registry = Mockito.mock(BmpMessageRegistry.class);
- private final BmpSessionFactory sessionFactory = Mockito.mock(BmpSessionFactory.class);
- private final BmpSessionListenerFactory slf = Mockito.mock(BmpSessionListenerFactory.class);
+ private final BmpSessionFactory sessionFactory = new DefaultBmpSessionFactory();
+ private final BmpMockSessionListener sl = new BmpMockSessionListener();
+ @Mock
+ private BmpMessageRegistry registry;
+ @Mock
+ private BmpSessionListenerFactory slf;
+ private BmpMockDispatcher bmpMockDispatcher;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ doReturn(this.sl).when(this.slf).getSessionListener();
+ this.bmpMockDispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
+ }
@Test
- public void testCreateClient() throws InterruptedException {
- final BmpMockDispatcher dispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
+ public void testCreateClient() throws Exception {
final int port = InetSocketAddressUtil.getRandomPort();
final InetSocketAddress serverAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
- final BmpDispatcherImpl serverDispatcher = new BmpDispatcherImpl(
+
+ final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
new NioEventLoopGroup(), new NioEventLoopGroup(), this.registry, this.sessionFactory);
- final ChannelFuture futureServer = serverDispatcher
+ final ChannelFuture futureServer = bmpDispatcher
.createServer(serverAddr, this.slf, KeyMapping.getKeyMapping());
- waitFutureComplete(futureServer);
- final ChannelFuture channelFuture = dispatcher.createClient(InetSocketAddressUtil
+ waitFutureSuccess(futureServer);
+
+ final ChannelFuture channelFuture = this.bmpMockDispatcher.createClient(InetSocketAddressUtil
.getRandomLoopbackInetSocketAddress(0), serverAddr);
- waitFutureComplete(channelFuture);
+ waitFutureSuccess(channelFuture);
final Channel channel = channelFuture.sync().channel();
- Assert.assertTrue(channel.isActive());
+ assertTrue(channel.isActive());
+ checkEquals(() -> assertTrue(this.sl.getStatus()));
channel.close();
- serverDispatcher.close();
+ bmpDispatcher.close();
+ checkEquals(() -> assertFalse(this.sl.getStatus()));
+
+ final BmpDispatcherImpl bmpDispatcher2 = new BmpDispatcherImpl(
+ new NioEventLoopGroup(), new NioEventLoopGroup(), this.registry, this.sessionFactory);
+ final ChannelFuture futureServer2 = bmpDispatcher2
+ .createServer(serverAddr, this.slf, KeyMapping.getKeyMapping());
+ waitFutureSuccess(futureServer2);
+ checkEquals(() -> assertTrue(this.sl.getStatus()));
+
+ bmpDispatcher2.close();
+ this.bmpMockDispatcher.close();
+ checkEquals(() -> assertFalse(this.sl.getStatus()));
}
@Test
- public void testCreateServer() throws InterruptedException {
- final BmpMockDispatcher dispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
+ public void testCreateServer() throws Exception {
final int port = InetSocketAddressUtil.getRandomPort();
- final BmpDispatcherImpl serverDispatcher = new BmpDispatcherImpl(
+ final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
new NioEventLoopGroup(), new NioEventLoopGroup(), this.registry, this.sessionFactory);
- final ChannelFuture futureServer = dispatcher.createServer(
+ final ChannelFuture futureServer = this.bmpMockDispatcher.createServer(
new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port));
- waitFutureComplete(futureServer);
- final ChannelFuture channelFuture = serverDispatcher.createClient(
+ waitFutureSuccess(futureServer);
+ final ChannelFuture channelFuture = bmpDispatcher.createClient(
InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port), this.slf, KeyMapping.getKeyMapping());
- waitFutureComplete(channelFuture);
+ waitFutureSuccess(channelFuture);
final Channel channel = channelFuture.sync().channel();
- Assert.assertTrue(channel.isActive());
+ assertTrue(channel.isActive());
+ checkEquals(() -> assertTrue(this.sl.getStatus()));
+ assertTrue(futureServer.channel().isActive());
channel.close();
- serverDispatcher.close();
+
+ bmpDispatcher.close();
+ this.bmpMockDispatcher.close();
+ checkEquals(() -> assertFalse(this.sl.getStatus()));
}
}
package org.opendaylight.protocol.bmp.mock;
-import com.google.common.util.concurrent.Uninterruptibles;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
+
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
public void setUp() {
final BmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
this.bmpActivator = new BmpActivator(
- ServiceLoaderBGPExtensionProviderContext.getSingletonInstance());
+ ServiceLoaderBGPExtensionProviderContext.getSingletonInstance());
this.bmpActivator.start(ctx);
this.bmpDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
- ctx.getBmpMessageRegistry(),
- new DefaultBmpSessionFactory());
+ ctx.getBmpMessageRegistry(),
+ new DefaultBmpSessionFactory());
}
@After
final BmpSessionListenerFactory bmpSessionListenerFactory = () -> BmpMockTest.this.sessionListener;
final ChannelFuture futureServer = this.bmpDispatcher.createServer(serverAddr,
bmpSessionListenerFactory, KeyMapping.getKeyMapping());
- waitFutureComplete(futureServer);
+ waitFutureSuccess(futureServer);
final Channel serverChannel;
final int sessionUpWait;
if (futureServer.isSuccess()) {
"--pre_policy_routes",
"3"});
- Mockito.verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
+ verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
.onSessionUp(Mockito.any(BmpSession.class));
//1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
- Mockito.verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
- .times(13))
- .onMessage(Mockito.any(Notification.class));
+ verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
+ .times(13))
+ .onMessage(Mockito.any(Notification.class));
if (serverChannel != null) {
serverChannel.close().sync();
"--peers_count", "3", "--pre_policy_routes", "3", "--passive"});
final ChannelFuture futureServer = this.bmpDispatcher.createClient(serverAddr,
bmpSessionListenerFactory, KeyMapping.getKeyMapping());
- waitFutureComplete(futureServer);
+ waitFutureSuccess(futureServer);
final Channel serverChannel;
final int sessionUpWait;
if (futureServer.isSuccess()) {
sessionUpWait = 40;
}
- Mockito.verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
- .onSessionUp(Mockito.any(BmpSession.class));
+ verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
+ .onSessionUp(Mockito.any(BmpSession.class));
//1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
- Mockito.verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
- .times(13))
- .onMessage(Mockito.any(Notification.class));
+ verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
+ .times(13))
+ .onMessage(Mockito.any(Notification.class));
if (serverChannel != null) {
serverChannel.close().sync();
}
}
-
- static void waitFutureComplete(final ChannelFuture future) throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- future.addListener(future1 -> latch.countDown());
- Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
- }
}
package org.opendaylight.protocol.bmp.api;
import io.netty.channel.Channel;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
public interface BmpSessionFactory {
-
- BmpSession getSession(Channel channel, BmpSessionListenerFactory sessionListenerFactory);
-
+ /**
+ * Creates Bmp Session.
+ *
+ * @param channel generated channel
+ * @param sessionListenerFactory listener factory
+ * @return bmp session
+ */
+ @Nonnull BmpSession getSession(@Nonnull Channel channel,
+ @Nullable BmpSessionListenerFactory sessionListenerFactory);
}
package org.opendaylight.protocol.bmp.api;
+import javax.annotation.Nonnull;
+
public interface BmpSessionListenerFactory {
+ @Nonnull
BmpSessionListener getSessionListener();
}