import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
private final BGPHandlerFactory handlerFactory;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
- private final EventExecutor executor;
private Optional<KeyMapping> keys;
public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MD5ChannelFactory<?> cf, final MD5ServerChannelFactory<?> scf) {
this.bossGroup = Preconditions.checkNotNull(bossGroup);
this.workerGroup = Preconditions.checkNotNull(workerGroup);
- this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
this.handlerFactory = new BGPHandlerFactory(messageRegistry);
this.channelFactory = cf;
this.serverChannelFactory = scf;
final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
final Bootstrap bootstrap = createClientBootStrap();
- final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
- bootstrap.handler(BGPChannel.createChannelInitializer(initializer, sessionPromise));
+ final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, strategy, bootstrap);
+ bootstrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
sessionPromise.connect();
LOG.debug("Client created.");
return sessionPromise;
final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(peerRegistry);
this.keys = keys;
final Bootstrap bootstrap = createClientBootStrap();
- final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE, address,
+ final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, address,
connectStrategyFactory, bootstrap, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
reconnectPromise.connect();
this.keys = Optional.absent();
private ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor)));
+ final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer);
+ serverBootstrap.childHandler(serverChannelHandler);
+
serverBootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(SOCKET_BACKLOG_SIZE));
serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
};
}
- public static <S extends BGPSession> ChannelHandler createChannelInitializer(final ChannelPipelineInitializer initializer, final Promise<S> promise) {
+ public static <S extends BGPSession> ChannelHandler createClientChannelHandler(final ChannelPipelineInitializer initializer, final Promise<S> promise) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel channel) {
}
};
}
+
+ public static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) {
+ return new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(final SocketChannel channel) {
+ initializer.initializeChannel(channel, new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
+ }
+ };
+ }
}
}
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import javax.annotation.concurrent.GuardedBy;
@GuardedBy("this")
private Future<?> pending;
- public BGPProtocolSessionPromise(EventExecutor executor, InetSocketAddress address, ReconnectStrategy strategy, Bootstrap bootstrap) {
- super(executor);
+ public BGPProtocolSessionPromise(InetSocketAddress address, ReconnectStrategy strategy, Bootstrap bootstrap) {
+ super(GlobalEventExecutor.INSTANCE);
this.strategy = Preconditions.checkNotNull(strategy);
this.address = Preconditions.checkNotNull(address);
this.bootstrap = Preconditions.checkNotNull(bootstrap);
private final ReconnectStrategyFactory strategyFactory;
private final Bootstrap bootstrap;
private final ChannelPipelineInitializer initializer;
- private final EventExecutor executor;
private Future<S> pending;
public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap bootstrap,
final ChannelPipelineInitializer initializer) {
super(executor);
- this.executor = executor;
this.bootstrap = bootstrap;
this.initializer = Preconditions.checkNotNull(initializer);
this.address = Preconditions.checkNotNull(address);
public Future<S> connectSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap,
final ChannelPipelineInitializer initializer) {
- final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
+ final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, strategy, bootstrap);
final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel channel) {