public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
private static final int SOCKET_BACKLOG_SIZE = 128;
- private final MD5ServerChannelFactory<?> scf;
- private final MD5ChannelFactory<?> cf;
- private final BGPHandlerFactory hf;
+ private final MD5ServerChannelFactory<?> serverChannelFactory;
+ private final MD5ChannelFactory<?> channelFactory;
+ private final BGPHandlerFactory handlerFactory;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final EventExecutor executor;
this.bossGroup = Preconditions.checkNotNull(bossGroup);
this.workerGroup = Preconditions.checkNotNull(workerGroup);
this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
- this.hf = new BGPHandlerFactory(messageRegistry);
- this.cf = cf;
- this.scf = scf;
+ this.handlerFactory = new BGPHandlerFactory(messageRegistry);
+ this.channelFactory = cf;
+ this.serverChannelFactory = scf;
this.keys = null;
}
@Override
public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress address, final BGPPeerRegistry listener, final ReconnectStrategy strategy) {
final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(listener);
- final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf, snf);
-
- final Bootstrap b = new Bootstrap();
- final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, b);
- b.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
- b.handler(BGPChannel.createChannelInitializer(initializer, p));
- this.customizeBootstrap(b);
- setWorkerGroup(b);
- p.connect();
+ final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
+
+ final Bootstrap bootstrap = new Bootstrap();
+ final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
+ bootstrap.handler(BGPChannel.createChannelInitializer(initializer, sessionPromise));
+ this.customizeBootstrap(bootstrap);
+ setWorkerGroup(bootstrap);
+ sessionPromise.connect();
LOG.debug("Client created.");
- return p;
+ return sessionPromise;
}
@Override
final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(peerRegistry);
this.keys = keys;
- final Bootstrap b = new Bootstrap();
- final BGPReconnectPromise p = new BGPReconnectPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE, address,
- connectStrategyFactory, b, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf, snf));
- b.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
- this.customizeBootstrap(b);
- setWorkerGroup(b);
- p.connect();
+ final Bootstrap bootstrap = new Bootstrap();
+ final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE, address,
+ connectStrategyFactory, bootstrap, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
+ this.customizeBootstrap(bootstrap);
+ setWorkerGroup(bootstrap);
+ reconnectPromise.connect();
this.keys = null;
- return p;
+ return reconnectPromise;
}
@Override
public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address) {
final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(registry);
- final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf, snf);
- final ServerBootstrap b = new ServerBootstrap();
- b.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor)));
- b.option(ChannelOption.SO_BACKLOG, Integer.valueOf(SOCKET_BACKLOG_SIZE));
- b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- this.customizeBootstrap(b);
-
- final ChannelFuture f = b.bind(address);
- LOG.debug("Initiated server {} at {}.", f, address);
- return f;
+ final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
+ final ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor)));
+ serverBootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(SOCKET_BACKLOG_SIZE));
+ serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ this.customizeBootstrap(serverBootstrap);
+
+ final ChannelFuture channelFuture = serverBootstrap.bind(address);
+ LOG.debug("Initiated server {} at {}.", channelFuture, address);
+ return channelFuture;
}
- protected void customizeBootstrap(final Bootstrap b) {
+ protected void customizeBootstrap(final Bootstrap bootstrap) {
if (this.keys != null && !this.keys.isEmpty()) {
- if (this.cf == null) {
+ if (this.channelFactory == null) {
throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
}
- b.channelFactory(this.cf);
- b.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
+ bootstrap.channelFactory(this.channelFactory);
+ bootstrap.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
}
// Make sure we are doing round-robin processing
- b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+ bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
}
- private void customizeBootstrap(final ServerBootstrap b) {
+ private void customizeBootstrap(final ServerBootstrap serverBootstrap) {
if (this.keys != null && !this.keys.isEmpty()) {
- if (this.scf == null) {
+ if (this.serverChannelFactory == null) {
throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
}
- b.channelFactory(this.scf);
- b.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
+ serverBootstrap.channelFactory(this.serverChannelFactory);
+ serverBootstrap.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
}
// Make sure we are doing round-robin processing
- b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+ serverBootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
- if (b.group() == null) {
- b.group(this.bossGroup, this.workerGroup);
+ if (serverBootstrap.group() == null) {
+ serverBootstrap.group(this.bossGroup, this.workerGroup);
}
try {
- b.channel(NioServerSocketChannel.class);
+ serverBootstrap.channel(NioServerSocketChannel.class);
} catch (final IllegalStateException e) {
- LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+ LOG.trace("Not overriding channelFactory on bootstrap {}", serverBootstrap, e);
}
}
- private void setWorkerGroup(final Bootstrap b) {
- if (b.group() == null) {
- b.group(this.workerGroup);
+ private void setWorkerGroup(final Bootstrap bootstrap) {
+ if (bootstrap.group() == null) {
+ bootstrap.group(this.workerGroup);
}
try {
- b.channel(NioSocketChannel.class);
+ bootstrap.channel(NioSocketChannel.class);
} catch (final IllegalStateException e) {
- LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+ LOG.trace("Not overriding channelFactory on bootstrap {}", bootstrap, e);
}
}
createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
return new ChannelPipelineInitializer<S>() {
@Override
- public void initializeChannel(final SocketChannel ch, final Promise<S> promise) {
- ch.pipeline().addLast(hf.getDecoders());
- ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(ch, promise));
- ch.pipeline().addLast(hf.getEncoders());
+ public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
+ channel.pipeline().addLast(hf.getDecoders());
+ channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise));
+ channel.pipeline().addLast(hf.getEncoders());
}
};
}
public static <S extends BGPSession> ChannelHandler createChannelInitializer(final ChannelPipelineInitializer initializer, final Promise<S> promise) {
return new ChannelInitializer<SocketChannel>() {
@Override
- protected void initChannel(final SocketChannel ch) {
- initializer.initializeChannel(ch, promise);
+ protected void initChannel(final SocketChannel channel) {
+ initializer.initializeChannel(channel, promise);
}
};
}
public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
private static final Logger LOG = LoggerFactory.getLogger(BGPProtocolSessionPromise.class);
private final ReconnectStrategy strategy;
- private final Bootstrap b;
+ private final Bootstrap bootstrap;
private InetSocketAddress address;
@GuardedBy("this")
private Future<?> pending;
- public BGPProtocolSessionPromise(EventExecutor executor, InetSocketAddress address, ReconnectStrategy strategy, Bootstrap b) {
+ public BGPProtocolSessionPromise(EventExecutor executor, InetSocketAddress address, ReconnectStrategy strategy, Bootstrap bootstrap) {
super(executor);
this.strategy = Preconditions.checkNotNull(strategy);
this.address = Preconditions.checkNotNull(address);
- this.b = Preconditions.checkNotNull(b);
+ this.bootstrap = Preconditions.checkNotNull(bootstrap);
}
public synchronized void connect() {
final BGPProtocolSessionPromise lock = this;
try {
- int e = this.strategy.getConnectTimeout();
- LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(e));
+ int timeout = this.strategy.getConnectTimeout();
+ LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(timeout));
if (this.address.isUnresolved()) {
this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
}
- this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, e);
- final ChannelFuture connectFuture = this.b.connect(this.address);
+ this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
+ final ChannelFuture connectFuture = this.bootstrap.connect(this.address);
connectFuture.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener(lock));
this.pending = connectFuture;
} catch (Exception e) {
}
@Override
- public void operationComplete(final ChannelFuture cf) throws Exception {
+ public void operationComplete(final ChannelFuture channelFuture) throws Exception {
synchronized (this.lock) {
BGPProtocolSessionPromise.LOG.debug("Promise {} connection resolved", this.lock);
- Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(cf));
+ Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(channelFuture));
if (BGPProtocolSessionPromise.this.isCancelled()) {
- if (cf.isSuccess()) {
+ if (channelFuture.isSuccess()) {
BGPProtocolSessionPromise.LOG.debug("Closing channel for cancelled promise {}", this.lock);
- cf.channel().close();
+ channelFuture.channel().close();
}
- } else if (cf.isSuccess()) {
+ } else if (channelFuture.isSuccess()) {
BGPProtocolSessionPromise.LOG.debug("Promise {} connection successful", this.lock);
} else {
- BGPProtocolSessionPromise.LOG.debug("Attempt to connect to {} failed", BGPProtocolSessionPromise.this.address, cf.cause());
- final Future rf = BGPProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
- rf.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener.ReconnectingStrategyListener());
- BGPProtocolSessionPromise.this.pending = rf;
+ BGPProtocolSessionPromise.LOG.debug("Attempt to connect to {} failed", BGPProtocolSessionPromise.this.address, channelFuture.cause());
+ final Future reconnectFuture = BGPProtocolSessionPromise.this.strategy.scheduleReconnect(channelFuture.cause());
+ reconnectFuture.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener.ReconnectingStrategyListener());
+ BGPProtocolSessionPromise.this.pending = reconnectFuture;
}
}
}
}
@Override
- public void operationComplete(final Future<Void> sf) {
+ public void operationComplete(final Future<Void> sessionFuture) {
synchronized (BootstrapConnectListener.this.lock) {
- Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(sf));
+ Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(sessionFuture));
if (!BGPProtocolSessionPromise.this.isCancelled()) {
- if (sf.isSuccess()) {
+ if (sessionFuture.isSuccess()) {
BGPProtocolSessionPromise.this.connect();
} else {
- BGPProtocolSessionPromise.this.setFailure(sf.cause());
+ BGPProtocolSessionPromise.this.setFailure(sessionFuture.cause());
}
}
private final InetSocketAddress address;
private final ReconnectStrategyFactory strategyFactory;
- private final Bootstrap b;
+ private final Bootstrap bootstrap;
private final ChannelPipelineInitializer initializer;
private final EventExecutor executor;
private Future<S> pending;
public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
- final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b,
+ final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap bootstrap,
final ChannelPipelineInitializer initializer) {
super(executor);
this.executor = executor;
- this.b = b;
+ this.bootstrap = bootstrap;
this.initializer = Preconditions.checkNotNull(initializer);
this.address = Preconditions.checkNotNull(address);
this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
}
public synchronized void connect() {
- final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
+ final ReconnectStrategy reconnectStrategy = this.strategyFactory.createReconnectStrategy();
// Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
- pending = createClient(this.address, cs, b, new ChannelPipelineInitializer<S>() {
+ this.pending = connectSessionPromise(this.address, reconnectStrategy, this.bootstrap, new ChannelPipelineInitializer<S>() {
@Override
public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
- initializer.initializeChannel(channel, promise);
+ BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
// add closed channel handler
// This handler has to be added as last channel handler and the channel inactive event has to be caught by it
// Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
}
});
- pending.addListener(new GenericFutureListener<Future<Object>>() {
+ this.pending.addListener(new GenericFutureListener<Future<Object>>() {
@Override
- public void operationComplete(Future<Object> future) throws Exception {
+ public void operationComplete(final Future<Object> future) throws Exception {
if (!future.isSuccess()) {
BGPReconnectPromise.this.setFailure(future.cause());
}
});
}
- public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap,
+ public Future<S> connectSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap,
final ChannelPipelineInitializer initializer) {
- final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
+ final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
@Override
- protected void initChannel(SocketChannel ch) {
- initializer.initializeChannel(ch, p);
+ protected void initChannel(final SocketChannel channel) {
+ initializer.initializeChannel(channel, sessionPromise);
}
};
bootstrap.handler(chInit);
- p.connect();
+ sessionPromise.connect();
LOG.debug("Client created.");
- return p;
+ return sessionPromise;
}
/**
* @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
*/
private boolean isInitialConnectFinished() {
- Preconditions.checkNotNull(pending);
- return pending.isDone() && pending.isSuccess();
+ Preconditions.checkNotNull(this.pending);
+ return this.pending.isDone() && this.pending.isSuccess();
}
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
- Preconditions.checkNotNull(pending);
+ Preconditions.checkNotNull(this.pending);
this.pending.cancel(mayInterruptIfRunning);
return true;
}
-
return false;
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
// This is the ultimate channel inactive handler, not forwarding
- if (promise.isCancelled()) {
+ if (this.promise.isCancelled()) {
return;
}
- if (!promise.isInitialConnectFinished()) {
- LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
+ if (!this.promise.isInitialConnectFinished()) {
+ LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
}
- LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
- promise.connect();
+ LOG.debug("Reconnecting after connection to {} was dropped", this.promise.address);
+ this.promise.connect();
}
}
}
\ No newline at end of file