import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import com.google.common.base.Preconditions;
@ThreadSafe
-final class ProtocolSessionPromise<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends DefaultPromise<S> {
+final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
private static final Logger logger = LoggerFactory.getLogger(ProtocolSessionPromise.class);
- private final ChannelInitializerImpl<M, S, L> init;
private final ReconnectStrategy strategy;
private final InetSocketAddress address;
private final Bootstrap b;
@GuardedBy("this")
private Future<?> pending;
- ProtocolSessionPromise(final EventLoopGroup workerGroup, final InetSocketAddress address, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
- final SessionListenerFactory<L> listenerFactory,
- final ProtocolHandlerFactory<?> protocolFactory, final ReconnectStrategy strategy) {
+ ProtocolSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) {
this.strategy = Preconditions.checkNotNull(strategy);
this.address = Preconditions.checkNotNull(address);
-
- init = new ChannelInitializerImpl<M, S, L>(negotiatorFactory, listenerFactory, protocolFactory, this);
- b = new Bootstrap();
- b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init);
+ this.b = Preconditions.checkNotNull(b);
}
synchronized void connect() {
final Object lock = this;
try {
- final int timeout = strategy.getConnectTimeout();
+ final int timeout = this.strategy.getConnectTimeout();
logger.debug("Promise {} attempting connect for {}ms", lock, timeout);
- b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
- pending = b.connect(address).addListener(new ChannelFutureListener() {
+ this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
+ this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture cf) throws Exception {
synchronized (lock) {
logger.debug("Promise {} connection resolved", lock);
// Triggered when a connection attempt is resolved.
- Preconditions.checkState(pending == cf);
+ Preconditions.checkState(ProtocolSessionPromise.this.pending == cf);
/*
* The promise we gave out could have been cancelled,
}
if (!cf.isSuccess()) {
- final Future<Void> rf = strategy.scheduleReconnect(cf.cause());
+ final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
rf.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(final Future<Void> sf) {
synchronized (lock) {
// Triggered when a connection attempt is to be made.
- Preconditions.checkState(pending == sf);
+ Preconditions.checkState(ProtocolSessionPromise.this.pending == sf);
/*
* The promise we gave out could have been cancelled,
}
});
- pending = rf;
+ ProtocolSessionPromise.this.pending = rf;
} else {
logger.debug("Promise {} connection successful", lock);
}
}
}
});
- } catch (Exception e) {
+ } catch (final Exception e) {
setFailure(e);
}
}
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
- pending.cancel(mayInterruptIfRunning);
+ this.pending.cancel(mayInterruptIfRunning);
return true;
}
@Override
public synchronized Promise<S> setSuccess(final S result) {
logger.debug("Promise {} completed", this);
- strategy.reconnectSuccessful();
+ this.strategy.reconnectSuccessful();
return super.setSuccess(result);
}
-}
\ No newline at end of file
+}