import static java.util.Objects.requireNonNull;
+import com.google.common.base.Optional;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
+import org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil;
import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class BmpMockDispatcher {
+final class BmpMockDispatcher implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BmpMockDispatcher.class);
private static final int CONNECT_TIMEOUT = 2000;
- private static final int MAX_CONNECTIONS_COUNT = 128;
private static final int INITIAL_BACKOFF = 15_000;
private final BmpHandlerFactory hf;
private final BmpSessionFactory sessionFactory;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private final BmpMockSessionListenerFactory slf;
+ @GuardedBy("this")
+ private boolean close;
BmpMockDispatcher(final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
this.sessionFactory = requireNonNull(sessionFactory);
+ this.slf = new BmpMockSessionListenerFactory();
requireNonNull(registry);
this.hf = new BmpHandlerFactory(registry);
}
- private Bootstrap createClientInstance(final SocketAddress localAddress) {
- final NioEventLoopGroup workergroup = new NioEventLoopGroup();
- final Bootstrap bootstrap = new Bootstrap();
-
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
- bootstrap.option(ChannelOption.SO_REUSEADDR, true);
- bootstrap.group(workergroup);
-
- bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
- @Override
- protected void initChannel(final NioSocketChannel ch) throws Exception {
- ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
- ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
- }
- });
- bootstrap.localAddress(localAddress);
- return bootstrap;
- }
-
- ChannelFuture createClient(final SocketAddress localAddress, final InetSocketAddress remoteAddress) {
- requireNonNull(localAddress);
- requireNonNull(remoteAddress);
-
- // ideally we should use Bootstrap clones here
- final Bootstrap bootstrap = createClientInstance(localAddress);
- bootstrap.remoteAddress(remoteAddress);
+ ChannelFuture createClient(@Nonnull final SocketAddress localAddress,
+ @Nonnull final InetSocketAddress remoteAddress) {
+ final Bootstrap bootstrap = BmpDispatcherUtil.createClientBootstrap(this.sessionFactory, this.hf,
+ BmpDispatcherUtil::createChannelWithEncoder, this.slf, remoteAddress, localAddress, this.workerGroup,
+ CONNECT_TIMEOUT, Optional.absent(), true, false);
final ChannelFuture channelFuture = bootstrap.connect(remoteAddress);
LOG.info("BMP client {} <--> {} deployed", localAddress, remoteAddress);
- channelFuture.addListener(new BootstrapListener(bootstrap, remoteAddress));
+ channelFuture.addListener(new BootstrapListener(bootstrap, localAddress, remoteAddress));
return channelFuture;
}
- private ServerBootstrap createServerInstance() {
- final ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(final Channel ch) throws Exception {
- ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
- ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
- }
- });
-
- serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
- serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.group(this.bossGroup, this.workerGroup);
- return serverBootstrap;
- }
-
ChannelFuture createServer(final InetSocketAddress localAddress) {
requireNonNull(localAddress);
- final ServerBootstrap serverBootstrap = createServerInstance();
+ final ServerBootstrap serverBootstrap = BmpDispatcherUtil.createServerBootstrap(this.sessionFactory,
+ this.hf, this.slf, BmpDispatcherUtil::createChannelWithEncoder,
+ this.bossGroup, this.workerGroup, Optional.absent(), false);
final ChannelFuture channelFuture = serverBootstrap.bind(localAddress);
LOG.info("Initiated BMP server at {}.", localAddress);
return channelFuture;
}
- private static class BootstrapListener implements ChannelFutureListener {
+ @Override
+ public synchronized void close() {
+ this.close = true;
+ }
+
+ private class BootstrapListener implements ChannelFutureListener {
private final Bootstrap bootstrap;
- private final InetSocketAddress address;
+ private final InetSocketAddress remoteAddress;
+ private final SocketAddress localAddress;
private long delay;
+ private Timer timer = new Timer();
- BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress address) {
+ BootstrapListener(final Bootstrap bootstrap, final SocketAddress localAddress,
+ final InetSocketAddress remoteAddress) {
this.bootstrap = bootstrap;
- this.address = address;
+ this.remoteAddress = remoteAddress;
+ this.localAddress = localAddress;
this.delay = INITIAL_BACKOFF;
}
@Override
- public void operationComplete(final ChannelFuture cf) throws Exception {
- if (cf.isCancelled()) {
- LOG.debug("Connection {} cancelled!", cf);
- } else if (cf.isSuccess()) {
- LOG.debug("Connection {} succeeded!", cf);
+ public void operationComplete(final ChannelFuture future) throws Exception {
+ if (future.isCancelled()) {
+ LOG.debug("Connection {} cancelled!", future);
+ } else if (future.isSuccess()) {
+ LOG.debug("Connection {} succeeded!", future);
+ addCloseDetectListener(future.channel());
} else {
- final EventLoop loop = cf.channel().eventLoop();
+ final EventLoop loop = future.channel().eventLoop();
loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
- this.address, this.delay);
+ this.remoteAddress, this.delay);
+ }
+ }
+
+ private void addCloseDetectListener(Channel channel) {
+ //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
+ channel.closeFuture().addListener((ChannelFutureListener) future -> scheduleConnect());
+ }
+
+ private void scheduleConnect() {
+ if (!BmpMockDispatcher.this.close) {
+
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ createClient(BootstrapListener.this.localAddress,
+ BmpMockDispatcher.BootstrapListener.this.remoteAddress);
+ }
+ }, (long) 5);
}
}
}