import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
* start method that will handle sockets in different thread.
private final EventLoopGroup workerGroup;
+ private final EventExecutor executor;
+
/**
* Internally creates new instances of NioEventLoopGroup, might deplete system resources and result in Too many open files exception.
*
*/
@Deprecated
protected AbstractDispatcher() {
- this(new NioEventLoopGroup(),new NioEventLoopGroup());
+ this(GlobalEventExecutor.INSTANCE, new NioEventLoopGroup(),new NioEventLoopGroup());
}
protected AbstractDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
- this.bossGroup = bossGroup;
- this.workerGroup = workerGroup;
+ this(GlobalEventExecutor.INSTANCE, bossGroup, workerGroup);
}
+ protected AbstractDispatcher(final EventExecutor executor, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+ this.bossGroup = Preconditions.checkNotNull(bossGroup);
+ this.workerGroup = Preconditions.checkNotNull(workerGroup);
+ this.executor = Preconditions.checkNotNull(executor);
+ }
+
+
/**
* Creates server. Each server needs factories to pass their instances to client sessions.
*
@Override
protected void initChannel(final SocketChannel ch) {
- initializer.initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE));
+ initializer.initializeChannel(ch, new DefaultPromise<S>(executor));
}
});
b.childOption(ChannelOption.SO_KEEPALIVE, true);
*/
protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
final Bootstrap b = new Bootstrap();
- final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
+ final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(executor, address, strategy, b);
b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
new ChannelInitializer<SocketChannel>() {
protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
- final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, initializer);
+ final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, reestablishStrategy, initializer);
p.connect();
return p;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@GuardedBy("this")
private Future<?> pending;
- ProtocolSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) {
+ ProtocolSessionPromise(final EventExecutor executor, final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) {
+ super(executor);
this.strategy = Preconditions.checkNotNull(strategy);
this.address = Preconditions.checkNotNull(address);
this.b = Preconditions.checkNotNull(b);
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
- public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
+ public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
final PipelineInitializer<S> initializer) {
-
+ super(executor);
this.dispatcher = Preconditions.checkNotNull(dispatcher);
this.address = Preconditions.checkNotNull(address);
this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
this.initializer = Preconditions.checkNotNull(initializer);
}
- // TODO rafactor
+ // FIXME: BUG-190: refactor
synchronized void connect() {
negotiationFinished.set(false);