From: Robert Varga Date: Sat, 4 Sep 2021 00:48:41 +0000 (+0200) Subject: Cleanup ReconnectPromise a bit X-Git-Tag: v2.0.4~16 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=82f3e83d6e777a5e76b8b6bca5a8c2646a4b82be;p=netconf.git Cleanup ReconnectPromise a bit ClosedChannelHandler is really a stateless object used by ReconnectPromise wiring. Acknowledge that fact by removing the class in favor of a field initialized by an anonoymous inner class. While we are here, also document locking rules and generally tighten things up, eliminating a SpotBugs suppression in the process. Change-Id: I6abc0ddf296dd9e2f5ea6de5a709003cf14edc7a Signed-off-by: Robert Varga --- diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/ReconnectPromise.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/ReconnectPromise.java index a3de2d436a..d69250ae52 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/ReconnectPromise.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/ReconnectPromise.java @@ -9,7 +9,6 @@ package org.opendaylight.netconf.nettyutil; import static java.util.Objects.requireNonNull; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -17,8 +16,11 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import java.net.InetSocketAddress; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; import org.opendaylight.netconf.api.NetconfSession; import org.opendaylight.netconf.api.NetconfSessionListener; +import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher.PipelineInitializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,26 +33,66 @@ final class ReconnectPromise initializer; + private final PipelineInitializer initializer; + /** + * Channel handler that responds to channelInactive event and reconnects the session unless the promise is + * cancelled. + */ + private final ChannelInboundHandlerAdapter inboundHandler = new ChannelInboundHandlerAdapter() { + @Override + public void channelInactive(final ChannelHandlerContext ctx) { + // This is the ultimate channel inactive handler, not forwarding + if (isCancelled()) { + return; + } + + synchronized (ReconnectPromise.this) { + final Future attempt = pending; + if (!attempt.isDone() || !attempt.isSuccess()) { + // Connection refused, negotiation failed, or similar + LOG.debug("Connection to {} was dropped during negotiation, reattempting", address); + } + + LOG.debug("Reconnecting after connection to {} was dropped", address); + lockedConnect(); + } + } + }; + + @GuardedBy("this") private Future pending; ReconnectPromise(final EventExecutor executor, final AbstractNetconfDispatcher dispatcher, final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, - final Bootstrap bootstrap, final AbstractNetconfDispatcher.PipelineInitializer initializer) { + final Bootstrap bootstrap, final PipelineInitializer initializer) { super(executor); - this.bootstrap = bootstrap; + this.bootstrap = requireNonNull(bootstrap); this.initializer = requireNonNull(initializer); this.dispatcher = requireNonNull(dispatcher); this.address = requireNonNull(address); this.strategyFactory = requireNonNull(connectStrategyFactory); } + @Override + public synchronized boolean cancel(final boolean mayInterruptIfRunning) { + if (super.cancel(mayInterruptIfRunning)) { + pending.cancel(mayInterruptIfRunning); + return true; + } + return false; + } + synchronized void connect() { - final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy(); + lockedConnect(); + } + + @Holding("this") + private void lockedConnect() { + final ReconnectStrategy cs = strategyFactory.createReconnectStrategy(); // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support // reconnect attempts - pending = this.dispatcher.createClient(this.address, cs, bootstrap, (channel, promise) -> { + pending = dispatcher.createClient(address, cs, bootstrap, (channel, promise) -> { initializer.initializeChannel(channel, promise); // add closed channel handler // This handler has to be added as last channel handler and the channel inactive event has to be caught by @@ -59,64 +101,13 @@ final class ReconnectPromise { - if (!future.isSuccess() && !ReconnectPromise.this.isDone()) { - ReconnectPromise.this.setFailure(future.cause()); + if (!future.isSuccess() && !isDone()) { + setFailure(future.cause()); } }); } - - /** - * Indicate if the initial connection succeeded. - * - * @return true if initial connection was established successfully, false if initial connection failed due to e.g. - * Connection refused, Negotiation failed - */ - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") - private synchronized boolean isInitialConnectFinished() { - requireNonNull(pending); - return pending.isDone() && pending.isSuccess(); - } - - @Override - public synchronized boolean cancel(final boolean mayInterruptIfRunning) { - if (super.cancel(mayInterruptIfRunning)) { - requireNonNull(pending); - this.pending.cancel(mayInterruptIfRunning); - return true; - } - - return false; - } - - /** - * Channel handler that responds to channelInactive event and reconnects the session. - * Only if the promise was not canceled. - */ - private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter { - private final ReconnectPromise promise; - - ClosedChannelHandler(final ReconnectPromise promise) { - this.promise = promise; - } - - @Override - public void channelInactive(final ChannelHandlerContext ctx) { - // This is the ultimate channel inactive handler, not forwarding - if (promise.isCancelled()) { - return; - } - - if (promise.isInitialConnectFinished() == false) { - LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address); - } - - LOG.debug("Reconnecting after connection to {} was dropped", promise.address); - promise.connect(); - } - } }