import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
-
-import java.io.Closeable;
-import java.net.InetSocketAddress;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.Closeable;
+import java.net.InetSocketAddress;
/**
* Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
*/
public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
+ protected interface PipelineInitializer<S extends ProtocolSession<?>> {
+ /**
+ * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+ * method needs to be implemented in protocol specific Dispatchers.
+ *
+ * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+ * @param promise to be passed to {@link SessionNegotiatorFactory}
+ */
+ public void initializeChannel(SocketChannel channel, Promise<S> promise);
+ }
+
+
private static final Logger logger = LoggerFactory.getLogger(AbstractDispatcher.class);
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
+ /**
+ * Internally creates new instances of NioEventLoopGroup, might deplete system resources and result in Too many open files exception.
+ *
+ * @deprecated use {@link AbstractDispatcher#AbstractDispatcher(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)} instead.
+ */
+ @Deprecated
protected AbstractDispatcher() {
- // FIXME: we should get these as arguments
- this.bossGroup = new NioEventLoopGroup();
- this.workerGroup = new NioEventLoopGroup();
+ this(new NioEventLoopGroup(),new NioEventLoopGroup());
}
- /**
- * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
- * method needs to be implemented in protocol specific Dispatchers.
- *
- * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
- * @param promise to be passed to {@link SessionNegotiatorFactory}
- */
- public abstract void initializeChannel(SocketChannel channel, Promise<S> promise, final SessionListenerFactory<L> lfactory);
+ protected AbstractDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ this.bossGroup = bossGroup;
+ this.workerGroup = workerGroup;
+ }
/**
* Creates server. Each server needs factories to pass their instances to client sessions.
- *
+ *
* @param address address to which the server should be bound
- *
+ * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
+ *
* @return ChannelFuture representing the binding process
*/
- @VisibleForTesting
- public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<L> lfactory) {
+ protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer<S> initializer) {
final ServerBootstrap b = new ServerBootstrap();
b.group(this.bossGroup, this.workerGroup);
b.channel(NioServerSocketChannel.class);
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
- initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE), lfactory);
+ initializer.initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE));
}
});
b.childOption(ChannelOption.SO_KEEPALIVE, true);
/**
* Creates a client.
- *
+ *
* @param address remote address
* @param connectStrategy Reconnection strategy to be used when initial connection fails
- *
+ *
* @return Future representing the connection process. Its result represents the combined success of TCP connection
- * as well as session negotiation.
+ * as well as session negotiation.
*/
- @VisibleForTesting
- public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
- final SessionListenerFactory<L> lfactory) {
+ protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
final Bootstrap b = new Bootstrap();
final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
- initializeChannel(ch, p, lfactory);
+ initializer.initializeChannel(ch, p);
}
});
p.connect();
/**
* Creates a client.
- *
+ *
* @param address remote address
* @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails
* @param reestablishStrategy Reconnection strategy to be used when the already-established session fails
- *
+ *
* @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
- * success if it indicates no further attempts should be made and failure if it reports an error
+ * success if it indicates no further attempts should be made and failure if it reports an error
*/
protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
- final ReconnectStrategy reestablishStrategy, final SessionListenerFactory<L> lfactory) {
+ final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
- final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, lfactory);
+ final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, initializer);
p.connect();
return p;
}
- @Override
- public void close() {
- try {
- this.workerGroup.shutdownGracefully();
- } finally {
- this.bossGroup.shutdownGracefully();
- }
- }
+ /**
+ * @deprecated Should only be used with {@link AbstractDispatcher#AbstractDispatcher()}
+ */
+ @Deprecated
+ @Override
+ public void close() {
+ try {
+ this.workerGroup.shutdownGracefully();
+ } finally {
+ this.bossGroup.shutdownGracefully();
+ }
+ }
+
}