Eliminate ReconnectingStrategyListener
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / NetconfSessionPromise.java
index fec85ae871714e275dbc201f20eafd23a7dfb681..6136452a3bf29233d22f238c145a7e43efa4ca10 100644 (file)
@@ -7,25 +7,23 @@
  */
 package org.opendaylight.netconf.nettyutil;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelOption;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.netconf.api.NetconfSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Deprecated
-@ThreadSafe
 final class NetconfSessionPromise<S extends NetconfSession> extends DefaultPromise<S> {
     private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionPromise.class);
     private final ReconnectStrategy strategy;
@@ -38,26 +36,25 @@ final class NetconfSessionPromise<S extends NetconfSession> extends DefaultPromi
     NetconfSessionPromise(final EventExecutor executor, final InetSocketAddress address,
             final ReconnectStrategy strategy, final Bootstrap bootstrap) {
         super(executor);
-        this.strategy = Preconditions.checkNotNull(strategy);
-        this.address = Preconditions.checkNotNull(address);
-        this.bootstrap = Preconditions.checkNotNull(bootstrap);
+        this.strategy = requireNonNull(strategy);
+        this.address = requireNonNull(address);
+        this.bootstrap = requireNonNull(bootstrap);
     }
 
     @SuppressWarnings("checkstyle:illegalCatch")
     synchronized void connect() {
         try {
-            final int timeout = this.strategy.getConnectTimeout();
+            final int timeout = strategy.getConnectTimeout();
 
             LOG.debug("Promise {} attempting connect for {}ms", this, timeout);
 
-            if (this.address.isUnresolved()) {
-                this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
+            if (address.isUnresolved()) {
+                address = new InetSocketAddress(address.getHostName(), address.getPort());
             }
-            this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
-            final ChannelFuture connectFuture = this.bootstrap.connect(this.address);
+            final ChannelFuture connectFuture = bootstrap.connect(address);
             // Add listener that attempts reconnect by invoking this method again.
-            connectFuture.addListener(new BootstrapConnectListener());
-            this.pending = connectFuture;
+            connectFuture.addListener((ChannelFutureListener) this::channelConnectComplete);
+            pending = connectFuture;
         } catch (final Exception e) {
             LOG.info("Failed to connect to {}", address, e);
             setFailure(e);
@@ -67,7 +64,7 @@ final class NetconfSessionPromise<S extends NetconfSession> extends DefaultPromi
     @Override
     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
         if (super.cancel(mayInterruptIfRunning)) {
-            this.pending.cancel(mayInterruptIfRunning);
+            pending.cancel(mayInterruptIfRunning);
             return true;
         }
 
@@ -77,73 +74,62 @@ final class NetconfSessionPromise<S extends NetconfSession> extends DefaultPromi
     @Override
     public synchronized Promise<S> setSuccess(final S result) {
         LOG.debug("Promise {} completed", this);
-        this.strategy.reconnectSuccessful();
+        strategy.reconnectSuccessful();
         return super.setSuccess(result);
     }
 
-    private class BootstrapConnectListener implements ChannelFutureListener {
-        @Override
-        public void operationComplete(final ChannelFuture cf) {
-            synchronized (NetconfSessionPromise.this) {
-
-                LOG.debug("Promise {} connection resolved", NetconfSessionPromise.this);
-
-                // Triggered when a connection attempt is resolved.
-                Preconditions.checkState(NetconfSessionPromise.this.pending.equals(cf));
-
-                /*
-                 * The promise we gave out could have been cancelled,
-                 * which cascades to the connect getting cancelled,
-                 * but there is a slight race window, where the connect
-                 * is already resolved, but the listener has not yet
-                 * been notified -- cancellation at that point won't
-                 * stop the notification arriving, so we have to close
-                 * the race here.
-                 */
-                if (isCancelled()) {
-                    if (cf.isSuccess()) {
-                        LOG.debug("Closing channel for cancelled promise {}", NetconfSessionPromise.this);
-                        cf.channel().close();
-                    }
-                    return;
-                }
-
-                if (cf.isSuccess()) {
-                    LOG.debug("Promise {} connection successful", NetconfSessionPromise.this);
-                    return;
-                }
-
-                LOG.debug("Attempt to connect to {} failed", NetconfSessionPromise.this.address, cf.cause());
-
-                final Future<Void> rf = NetconfSessionPromise.this.strategy.scheduleReconnect(cf.cause());
-                rf.addListener(new ReconnectingStrategyListener());
-                NetconfSessionPromise.this.pending = rf;
+    // Triggered when a connection attempt is resolved.
+    private synchronized void channelConnectComplete(final ChannelFuture cf) {
+        LOG.debug("Promise {} connection resolved", this);
+        checkState(pending.equals(cf));
+
+        /*
+         * The promise we gave out could have been cancelled,
+         * which cascades to the connect getting cancelled,
+         * but there is a slight race window, where the connect
+         * is already resolved, but the listener has not yet
+         * been notified -- cancellation at that point won't
+         * stop the notification arriving, so we have to close
+         * the race here.
+         */
+        if (isCancelled()) {
+            if (cf.isSuccess()) {
+                LOG.debug("Closing channel for cancelled promise {}", this);
+                cf.channel().close();
             }
+            return;
+        }
+
+        if (cf.isSuccess()) {
+            LOG.debug("Promise {} connection successful", this);
+            return;
         }
 
-        private class ReconnectingStrategyListener implements FutureListener<Void> {
-            @Override
-            public void operationComplete(final Future<Void> sf) {
-                synchronized (NetconfSessionPromise.this) {
-                    // Triggered when a connection attempt is to be made.
-                    Preconditions.checkState(NetconfSessionPromise.this.pending.equals(sf));
-
-                    /*
-                     * The promise we gave out could have been cancelled,
-                     * which cascades to the reconnect attempt getting
-                     * cancelled, but there is a slight race window, where
-                     * the reconnect attempt is already enqueued, but the
-                     * listener has not yet been notified -- if cancellation
-                     * happens at that point, we need to catch it here.
-                     */
-                    if (!isCancelled()) {
-                        if (sf.isSuccess()) {
-                            connect();
-                        } else {
-                            setFailure(sf.cause());
-                        }
-                    }
-                }
+        LOG.debug("Attempt to connect to {} failed", address, cf.cause());
+
+        final Future<Void> rf = strategy.scheduleReconnect(cf.cause());
+        rf.addListener(this::reconnectFutureComplete);
+        pending = rf;
+    }
+
+    // Triggered when a connection attempt is to be made.
+    private synchronized void reconnectFutureComplete(final Future<?> sf) {
+        LOG.debug("Promise {} strategy triggered reconnect", this);
+        checkState(pending.equals(sf));
+
+        /*
+         * The promise we gave out could have been cancelled,
+         * which cascades to the reconnect attempt getting
+         * cancelled, but there is a slight race window, where
+         * the reconnect attempt is already enqueued, but the
+         * listener has not yet been notified -- if cancellation
+         * happens at that point, we need to catch it here.
+         */
+        if (!isCancelled()) {
+            if (sf.isSuccess()) {
+                connect();
+            } else {
+                setFailure(sf.cause());
             }
         }
     }