+
+ private class BootstrapConnectListener implements ChannelFutureListener {
+ private final Object lock;
+
+ public BootstrapConnectListener(final Object lock) {
+ this.lock = lock;
+ }
+
+ @Override
+ public void operationComplete(final ChannelFuture cf) throws Exception {
+ synchronized (lock) {
+
+ LOG.debug("Promise {} connection resolved", lock);
+
+ // Triggered when a connection attempt is resolved.
+ Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf));
+
+ /*
+ * The promise we gave out could have been cancelled,
+ * which cascades to the connect getting cancelled,
+ * but there is a slight race window, where the connect
+ * is already resolved, but the listener has not yet
+ * been notified -- cancellation at that point won't
+ * stop the notification arriving, so we have to close
+ * the race here.
+ */
+ if (isCancelled()) {
+ if (cf.isSuccess()) {
+ LOG.debug("Closing channel for cancelled promise {}", lock);
+ cf.channel().close();
+ }
+ return;
+ }
+
+ if(cf.isSuccess()) {
+ LOG.debug("Promise {} connection successful", lock);
+ return;
+ }
+
+ LOG.debug("Attempt to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause());
+
+ final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
+ rf.addListener(new ReconnectingStrategyListener());
+ ProtocolSessionPromise.this.pending = rf;
+ }
+ }
+
+ private class ReconnectingStrategyListener implements FutureListener<Void> {
+ @Override
+ public void operationComplete(final Future<Void> sf) {
+ synchronized (lock) {
+ // Triggered when a connection attempt is to be made.
+ Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf));
+
+ /*
+ * The promise we gave out could have been cancelled,
+ * which cascades to the reconnect attempt getting
+ * cancelled, but there is a slight race window, where
+ * the reconnect attempt is already enqueued, but the
+ * listener has not yet been notified -- if cancellation
+ * happens at that point, we need to catch it here.
+ */
+ if (!isCancelled()) {
+ if (sf.isSuccess()) {
+ connect();
+ } else {
+ setFailure(sf.cause());
+ }
+ }
+ }
+ }
+ }
+
+ }
+