package org.opendaylight.protocol.bmp.impl;
+import static java.util.Objects.requireNonNull;
+import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createClientBootstrap;
+import static org.opendaylight.protocol.bmp.impl.BmpDispatcherUtil.createServerBootstrap;
+
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.bmp.api.BmpDispatcher;
import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
private static final Logger LOG = LoggerFactory.getLogger(BmpDispatcherImpl.class);
- private static final int MAX_CONNECTIONS_COUNT = 128;
-
private static final int CONNECT_TIMEOUT = 5000;
private static final int INITIAL_BACKOFF = 30_000;
private static final int MAXIMUM_BACKOFF = 720_000;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final BmpSessionFactory sessionFactory;
+ @GuardedBy("this")
+ private boolean close;
public BmpDispatcherImpl(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup,
final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
this.bossGroup = new EpollEventLoopGroup();
this.workerGroup = new EpollEventLoopGroup();
} else {
- this.bossGroup = Preconditions.checkNotNull(bossGroup);
- this.workerGroup = Preconditions.checkNotNull(workerGroup);
+ this.bossGroup = requireNonNull(bossGroup);
+ this.workerGroup = requireNonNull(workerGroup);
}
- this.hf = new BmpHandlerFactory(Preconditions.checkNotNull(registry));
- this.sessionFactory = Preconditions.checkNotNull(sessionFactory);
+ this.hf = new BmpHandlerFactory(requireNonNull(registry));
+ this.sessionFactory = requireNonNull(sessionFactory);
}
@Override
- public ChannelFuture createClient(final InetSocketAddress address, final BmpSessionListenerFactory slf, final Optional<KeyMapping> keys) {
-
- final Bootstrap b = new Bootstrap();
-
- Preconditions.checkNotNull(address);
-
- if (Epoll.isAvailable()) {
- b.channel(EpollSocketChannel.class);
- } else {
- b.channel(NioSocketChannel.class);
- }
- if (keys.isPresent()) {
- if (Epoll.isAvailable()) {
- b.option(EpollChannelOption.TCP_MD5SIG, keys.get());
- } else {
- throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
- }
- }
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
- b.group(this.workerGroup);
-
- b.handler(new ChannelInitializer<AbstractChannel>() {
- @Override
- protected void initChannel(final AbstractChannel ch) throws Exception {
- ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
- ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
- }
- });
-
- b.remoteAddress(address);
- final ChannelFuture channelPromise = b.connect();
- channelPromise.addListener(new BmpDispatcherImpl.BootstrapListener(b, address));
+ public ChannelFuture createClient(final InetSocketAddress remoteAddress, final BmpSessionListenerFactory slf,
+ final Optional<KeyMapping> keys) {
+ final Bootstrap bootstrap = createClientBootstrap(this.sessionFactory, this.hf,
+ BmpDispatcherUtil::createChannelWithDecoder, slf, remoteAddress, this.workerGroup,
+ CONNECT_TIMEOUT, keys);
+ final ChannelFuture channelPromise = bootstrap.connect();
+ channelPromise.addListener(new BootstrapListener(bootstrap, remoteAddress, slf, keys));
+ LOG.debug("Initiated BMP Client {} at {}.", channelPromise, remoteAddress);
return channelPromise;
}
@Override
- public ChannelFuture createServer(final InetSocketAddress address, final BmpSessionListenerFactory slf, final Optional<KeyMapping> keys) {
- Preconditions.checkNotNull(address);
- Preconditions.checkNotNull(slf);
-
- final ServerBootstrap b = new ServerBootstrap();
- b.childHandler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(final Channel ch) throws Exception {
- ch.pipeline().addLast(BmpDispatcherImpl.this.hf.getDecoders());
- ch.pipeline().addLast(BmpDispatcherImpl.this.sessionFactory.getSession(ch, slf));
- }
- });
-
- b.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
- b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
- if (Epoll.isAvailable()) {
- b.channel(EpollServerSocketChannel.class);
- } else {
- b.channel(NioServerSocketChannel.class);
- }
-
- if (keys.isPresent()) {
- if (Epoll.isAvailable()) {
- b.option(EpollChannelOption.TCP_MD5SIG, keys.get());
- } else {
- throw new UnsupportedOperationException (Epoll.unavailabilityCause().getCause());
- }
- }
- b.group(this.bossGroup, this.workerGroup);
- final ChannelFuture f = b.bind(address);
-
- LOG.debug("Initiated BMP server {} at {}.", f, address);
- return f;
+ public ChannelFuture createServer(final InetSocketAddress address, final BmpSessionListenerFactory slf,
+ final Optional<KeyMapping> keys) {
+ final ServerBootstrap serverBootstrap = createServerBootstrap(this.sessionFactory, this.hf, slf,
+ BmpDispatcherUtil::createChannelWithDecoder, this.bossGroup, this.workerGroup, keys);
+ final ChannelFuture channelFuture = serverBootstrap.bind(address);
+ LOG.debug("Initiated BMP server {} at {}.", channelFuture, address);
+ return channelFuture;
}
@Override
- public void close() {
+ public synchronized void close() {
+ this.close = true;
if (Epoll.isAvailable()) {
this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
}
private class BootstrapListener implements ChannelFutureListener {
-
private final Bootstrap bootstrap;
-
+ private final InetSocketAddress remoteAddress;
+ private final BmpSessionListenerFactory slf;
+ private final Optional<KeyMapping> keys;
private long delay;
+ private Timer timer = new Timer();
- private final InetSocketAddress address;
-
- public BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress address) {
+ BootstrapListener(final Bootstrap bootstrap, final InetSocketAddress remoteAddress,
+ final BmpSessionListenerFactory slf, final Optional<KeyMapping> keys) {
this.bootstrap = bootstrap;
- this.address = address;
+ this.remoteAddress = remoteAddress;
this.delay = INITIAL_BACKOFF;
+ this.slf = slf;
+ this.keys = keys;
}
@Override
- public void operationComplete(final ChannelFuture cf) throws Exception {
- if (cf.isCancelled()) {
- LOG.debug("Connection {} cancelled!", cf);
- } else if (cf.isSuccess()) {
- LOG.debug("Connection {} succeeded!", cf);
+ public void operationComplete(final ChannelFuture future) throws Exception {
+ if (future.isCancelled()) {
+ LOG.debug("Connection {} cancelled!", future);
+ } else if (future.isSuccess()) {
+ LOG.debug("Connection {} succeeded!", future);
+ addCloseDetectListener(future.channel());
} else {
if (this.delay > MAXIMUM_BACKOFF) {
- LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP router {}.", this.address);
- cf.cancel(false);
+ LOG.warn("The time of maximum backoff has been exceeded. No further connection attempts with BMP " +
+ "router {}.", this.remoteAddress);
+ future.cancel(false);
return;
}
- final EventLoop loop = cf.channel().eventLoop();
+ final EventLoop loop = future.channel().eventLoop();
loop.schedule(() -> this.bootstrap.connect().addListener(this), this.delay, TimeUnit.MILLISECONDS);
- LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.", this.address, this.delay);
+ LOG.info("The connection try to BMP router {} failed. Next reconnection attempt in {} milliseconds.",
+ this.remoteAddress, this.delay);
this.delay *= 2;
}
}
+
+ private void addCloseDetectListener(Channel channel) {
+ //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
+ channel.closeFuture().addListener((ChannelFutureListener) future -> scheduleConnect());
+ }
+
+ private void scheduleConnect() {
+ if (!BmpDispatcherImpl.this.close) {
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ createClient(BootstrapListener.this.remoteAddress, BootstrapListener.this.slf,
+ BootstrapListener.this.keys);
+ }
+ }, (long) 5);
+ }
+ }
}
}