Cleanup ReconnectPromise a bit 87/97387/5
authorRobert Varga <robert.varga@pantheon.tech>
Sat, 4 Sep 2021 00:48:41 +0000 (02:48 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 4 Sep 2021 01:11:38 +0000 (03:11 +0200)
ClosedChannelHandler is really a stateless object used by
ReconnectPromise wiring. Acknowledge that fact by removing the class in
favor of a field initialized by an anonoymous inner class.

While we are here, also document locking rules and generally tighten
things up, eliminating a SpotBugs suppression in the process.

Change-Id: I6abc0ddf296dd9e2f5ea6de5a709003cf14edc7a
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/ReconnectPromise.java

index a3de2d436aecfe9d8d83af628bf72183c9a7bb22..d69250ae5275d73dc632a35e0b927a663394fe35 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.netconf.nettyutil;
 
 import static java.util.Objects.requireNonNull;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -17,8 +16,11 @@ import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import java.net.InetSocketAddress;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
 import org.opendaylight.netconf.api.NetconfSession;
 import org.opendaylight.netconf.api.NetconfSessionListener;
+import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher.PipelineInitializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,26 +33,66 @@ final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionL
     private final InetSocketAddress address;
     private final ReconnectStrategyFactory strategyFactory;
     private final Bootstrap bootstrap;
-    private final AbstractNetconfDispatcher.PipelineInitializer<S> initializer;
+    private final PipelineInitializer<S> initializer;
+    /**
+     * Channel handler that responds to channelInactive event and reconnects the session unless the promise is
+     * cancelled.
+     */
+    private final ChannelInboundHandlerAdapter inboundHandler = new ChannelInboundHandlerAdapter() {
+        @Override
+        public void channelInactive(final ChannelHandlerContext ctx) {
+            // This is the ultimate channel inactive handler, not forwarding
+            if (isCancelled()) {
+                return;
+            }
+
+            synchronized (ReconnectPromise.this) {
+                final Future<?> attempt = pending;
+                if (!attempt.isDone() || !attempt.isSuccess()) {
+                    // Connection refused, negotiation failed, or similar
+                    LOG.debug("Connection to {} was dropped during negotiation, reattempting", address);
+                }
+
+                LOG.debug("Reconnecting after connection to {} was dropped", address);
+                lockedConnect();
+            }
+        }
+    };
+
+    @GuardedBy("this")
     private Future<?> pending;
 
     ReconnectPromise(final EventExecutor executor, final AbstractNetconfDispatcher<S, L> dispatcher,
             final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
-            final Bootstrap bootstrap, final AbstractNetconfDispatcher.PipelineInitializer<S> initializer) {
+            final Bootstrap bootstrap, final PipelineInitializer<S> initializer) {
         super(executor);
-        this.bootstrap = bootstrap;
+        this.bootstrap = requireNonNull(bootstrap);
         this.initializer = requireNonNull(initializer);
         this.dispatcher = requireNonNull(dispatcher);
         this.address = requireNonNull(address);
         this.strategyFactory = requireNonNull(connectStrategyFactory);
     }
 
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            pending.cancel(mayInterruptIfRunning);
+            return true;
+        }
+        return false;
+    }
+
     synchronized void connect() {
-        final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
+        lockedConnect();
+    }
+
+    @Holding("this")
+    private void lockedConnect() {
+        final ReconnectStrategy cs = strategyFactory.createReconnectStrategy();
 
         // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support
         // reconnect attempts
-        pending = this.dispatcher.createClient(this.address, cs, bootstrap, (channel, promise) -> {
+        pending = dispatcher.createClient(address, cs, bootstrap, (channel, promise) -> {
             initializer.initializeChannel(channel, promise);
             // add closed channel handler
             // This handler has to be added as last channel handler and the channel inactive event has to be caught by
@@ -59,64 +101,13 @@ final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionL
             // reconnect will not work
             // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource
             // cleanup) before a new connection is started
-            channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
+            channel.pipeline().addLast(inboundHandler);
         });
 
         pending.addListener(future -> {
-            if (!future.isSuccess() && !ReconnectPromise.this.isDone()) {
-                ReconnectPromise.this.setFailure(future.cause());
+            if (!future.isSuccess() && !isDone()) {
+                setFailure(future.cause());
             }
         });
     }
-
-    /**
-     * Indicate if the initial connection succeeded.
-     *
-     * @return true if initial connection was established successfully, false if initial connection failed due to e.g.
-     *         Connection refused, Negotiation failed
-     */
-    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-            justification = "https://github.com/spotbugs/spotbugs/issues/811")
-    private synchronized boolean isInitialConnectFinished() {
-        requireNonNull(pending);
-        return pending.isDone() && pending.isSuccess();
-    }
-
-    @Override
-    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
-        if (super.cancel(mayInterruptIfRunning)) {
-            requireNonNull(pending);
-            this.pending.cancel(mayInterruptIfRunning);
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Channel handler that responds to channelInactive event and reconnects the session.
-     * Only if the promise was not canceled.
-     */
-    private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
-        private final ReconnectPromise<?, ?> promise;
-
-        ClosedChannelHandler(final ReconnectPromise<?, ?> promise) {
-            this.promise = promise;
-        }
-
-        @Override
-        public void channelInactive(final ChannelHandlerContext ctx) {
-            // This is the ultimate channel inactive handler, not forwarding
-            if (promise.isCancelled()) {
-                return;
-            }
-
-            if (promise.isInitialConnectFinished() == false) {
-                LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
-            }
-
-            LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
-            promise.connect();
-        }
-    }
 }