import com.google.common.base.Preconditions;
-final class ReconnectPromise<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends DefaultPromise<Void> {
- private final AbstractDispatcher dispatcher;
+final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
+ private final AbstractDispatcher<S, L> dispatcher;
private final InetSocketAddress address;
- private final L listener;
- private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
- private final ProtocolMessageFactory<M> messageFactory;
private final ReconnectStrategyFactory strategyFactory;
private final ReconnectStrategy strategy;
private Future<?> pending;
+ private final SessionListenerFactory<L> lfactory;
- public ReconnectPromise(final AbstractDispatcher dispatcher,
- final InetSocketAddress address, final L listener,
- final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
- final ProtocolMessageFactory<M> messageFactory,
- final ReconnectStrategyFactory connectStrategyFactory,
- final ReconnectStrategy reestablishStrategy) {
+ public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
+ final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
+ final SessionListenerFactory<L> lfactory) {
this.dispatcher = Preconditions.checkNotNull(dispatcher);
this.address = Preconditions.checkNotNull(address);
- this.listener = Preconditions.checkNotNull(listener);
- this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
- this.messageFactory = Preconditions.checkNotNull(messageFactory);
this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
this.strategy = Preconditions.checkNotNull(reestablishStrategy);
+ this.lfactory = Preconditions.checkNotNull(lfactory);
}
synchronized void connect() {
- final ReconnectStrategy cs = strategyFactory.createReconnectStrategy();
+ final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
final ReconnectStrategy rs = new ReconnectStrategy() {
@Override
public Future<Void> scheduleReconnect(final Throwable cause) {
@Override
public int getConnectTimeout() throws Exception {
final int cst = cs.getConnectTimeout();
- final int rst = strategy.getConnectTimeout();
+ final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
if (cst == 0) {
return rst;
}
};
- final Future<S> cf = dispatcher.createClient(address,
- listener, negotiatorFactory, messageFactory, rs);
+ final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.lfactory);
final Object lock = this;
- pending = cf;
+ this.pending = cf;
cf.addListener(new FutureListener<S>() {
@Override
public void operationComplete(final Future<S> future) {
synchronized (lock) {
if (!future.isSuccess()) {
- final Future<Void> rf = strategy.scheduleReconnect(cf.cause());
- pending = rf;
+ final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
+ ReconnectPromise.this.pending = rf;
rf.addListener(new FutureListener<Void>() {
@Override
* FIXME: we have a slight race window with cancellation
* here. Analyze and define its semantics.
*/
- strategy.reconnectSuccessful();
+ ReconnectPromise.this.strategy.reconnectSuccessful();
setSuccess(null);
}
}
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
- pending.cancel(mayInterruptIfRunning);
+ this.pending.cancel(mayInterruptIfRunning);
return true;
}