Fix another state-keeping thinko
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / ReconnectPromise.java
index a3de2d436aecfe9d8d83af628bf72183c9a7bb22..4537fcdc59fd65f7f59f729dc3c686689dab913c 100644 (file)
@@ -9,48 +9,76 @@ package org.opendaylight.netconf.nettyutil;
 
 import static java.util.Objects.requireNonNull;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
 import org.opendaylight.netconf.api.NetconfSession;
 import org.opendaylight.netconf.api.NetconfSessionListener;
+import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher.PipelineInitializer;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Deprecated
 final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionListener<? super S>>
-        extends DefaultPromise<Void> {
+        extends DefaultPromise<Empty> implements ReconnectFuture {
     private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);
 
     private final AbstractNetconfDispatcher<S, L> dispatcher;
     private final InetSocketAddress address;
     private final ReconnectStrategyFactory strategyFactory;
     private final Bootstrap bootstrap;
-    private final AbstractNetconfDispatcher.PipelineInitializer<S> initializer;
+    private final PipelineInitializer<S> initializer;
+    private final Promise<Empty> firstSessionFuture;
+
+    @GuardedBy("this")
     private Future<?> pending;
 
     ReconnectPromise(final EventExecutor executor, final AbstractNetconfDispatcher<S, L> dispatcher,
             final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
-            final Bootstrap bootstrap, final AbstractNetconfDispatcher.PipelineInitializer<S> initializer) {
+            final Bootstrap bootstrap, final PipelineInitializer<S> initializer) {
         super(executor);
-        this.bootstrap = bootstrap;
+        this.firstSessionFuture = new DefaultPromise<>(executor);
+        this.bootstrap = requireNonNull(bootstrap);
         this.initializer = requireNonNull(initializer);
         this.dispatcher = requireNonNull(dispatcher);
         this.address = requireNonNull(address);
         this.strategyFactory = requireNonNull(connectStrategyFactory);
     }
 
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            firstSessionFuture.cancel(mayInterruptIfRunning);
+            pending.cancel(mayInterruptIfRunning);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public Future<?> firstSessionFuture() {
+        return firstSessionFuture;
+    }
+
     synchronized void connect() {
-        final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
+        lockedConnect();
+    }
+
+    @Holding("this")
+    private void lockedConnect() {
+        final ReconnectStrategy cs = strategyFactory.createReconnectStrategy();
 
         // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support
         // reconnect attempts
-        pending = this.dispatcher.createClient(this.address, cs, bootstrap, (channel, promise) -> {
+        pending = dispatcher.createClient(address, cs, bootstrap, (channel, promise) -> {
             initializer.initializeChannel(channel, promise);
             // add closed channel handler
             // This handler has to be added as last channel handler and the channel inactive event has to be caught by
@@ -59,64 +87,38 @@ final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionL
             // reconnect will not work
             // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource
             // cleanup) before a new connection is started
-            channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
+            channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+                @Override
+                public void channelInactive(final ChannelHandlerContext ctx) {
+                    onChannelInactive();
+                }
+            });
         });
 
-        pending.addListener(future -> {
-            if (!future.isSuccess() && !ReconnectPromise.this.isDone()) {
-                ReconnectPromise.this.setFailure(future.cause());
-            }
-        });
-    }
-
-    /**
-     * Indicate if the initial connection succeeded.
-     *
-     * @return true if initial connection was established successfully, false if initial connection failed due to e.g.
-     *         Connection refused, Negotiation failed
-     */
-    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-            justification = "https://github.com/spotbugs/spotbugs/issues/811")
-    private synchronized boolean isInitialConnectFinished() {
-        requireNonNull(pending);
-        return pending.isDone() && pending.isSuccess();
-    }
-
-    @Override
-    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
-        if (super.cancel(mayInterruptIfRunning)) {
-            requireNonNull(pending);
-            this.pending.cancel(mayInterruptIfRunning);
-            return true;
+        if (!firstSessionFuture.isDone()) {
+            pending.addListener(future -> {
+                if (!future.isSuccess() && !firstSessionFuture.isDone()) {
+                    firstSessionFuture.setFailure(future.cause());
+                }
+            });
         }
-
-        return false;
     }
 
-    /**
-     * Channel handler that responds to channelInactive event and reconnects the session.
-     * Only if the promise was not canceled.
-     */
-    private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
-        private final ReconnectPromise<?, ?> promise;
-
-        ClosedChannelHandler(final ReconnectPromise<?, ?> promise) {
-            this.promise = promise;
+    private void onChannelInactive() {
+        // This is the ultimate channel inactive handler, not forwarding
+        if (isCancelled()) {
+            return;
         }
 
-        @Override
-        public void channelInactive(final ChannelHandlerContext ctx) {
-            // This is the ultimate channel inactive handler, not forwarding
-            if (promise.isCancelled()) {
-                return;
-            }
-
-            if (promise.isInitialConnectFinished() == false) {
-                LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
+        synchronized (this) {
+            final Future<?> attempt = pending;
+            if (!attempt.isDone() || !attempt.isSuccess()) {
+                // Connection refused, negotiation failed, or similar
+                LOG.debug("Connection to {} was dropped during negotiation, reattempting", address);
             }
 
-            LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
-            promise.connect();
+            LOG.debug("Reconnecting after connection to {} was dropped", address);
+            lockedConnect();
         }
     }
 }