X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FNetconfSessionPromise.java;h=6cb4ba59c974ea252ac2de6c467727658c446e4e;hb=a33c469af7ba77c86a89fb1637c6e65bc9cf234b;hp=91d0ba99d526589baeb998ec11fdb4355a203a48;hpb=a201b000f7d777bd7b53748c3f13487fbb398599;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/NetconfSessionPromise.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/NetconfSessionPromise.java index 91d0ba99d5..6cb4ba59c9 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/NetconfSessionPromise.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/NetconfSessionPromise.java @@ -7,11 +7,12 @@ */ package org.opendaylight.netconf.nettyutil; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelOption; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; @@ -36,26 +37,25 @@ final class NetconfSessionPromise extends DefaultPromi NetconfSessionPromise(final EventExecutor executor, final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap) { super(executor); - this.strategy = Preconditions.checkNotNull(strategy); - this.address = Preconditions.checkNotNull(address); - this.bootstrap = Preconditions.checkNotNull(bootstrap); + this.strategy = requireNonNull(strategy); + this.address = requireNonNull(address); + this.bootstrap = requireNonNull(bootstrap); } @SuppressWarnings("checkstyle:illegalCatch") synchronized void connect() { try { - final int timeout = this.strategy.getConnectTimeout(); + final int timeout = strategy.getConnectTimeout(); LOG.debug("Promise {} attempting connect for {}ms", this, timeout); - if (this.address.isUnresolved()) { - this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort()); + if (address.isUnresolved()) { + address = new InetSocketAddress(address.getHostName(), address.getPort()); } - this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); - final ChannelFuture connectFuture = this.bootstrap.connect(this.address); + final ChannelFuture connectFuture = bootstrap.connect(address); // Add listener that attempts reconnect by invoking this method again. - connectFuture.addListener(new BootstrapConnectListener()); - this.pending = connectFuture; + connectFuture.addListener((ChannelFutureListener) this::channelConnectComplete); + pending = connectFuture; } catch (final Exception e) { LOG.info("Failed to connect to {}", address, e); setFailure(e); @@ -65,7 +65,7 @@ final class NetconfSessionPromise extends DefaultPromi @Override public synchronized boolean cancel(final boolean mayInterruptIfRunning) { if (super.cancel(mayInterruptIfRunning)) { - this.pending.cancel(mayInterruptIfRunning); + pending.cancel(mayInterruptIfRunning); return true; } @@ -75,71 +75,64 @@ final class NetconfSessionPromise extends DefaultPromi @Override public synchronized Promise setSuccess(final S result) { LOG.debug("Promise {} completed", this); - this.strategy.reconnectSuccessful(); + strategy.reconnectSuccessful(); return super.setSuccess(result); } - private class BootstrapConnectListener implements ChannelFutureListener { - @Override - public void operationComplete(final ChannelFuture cf) { - synchronized (NetconfSessionPromise.this) { + // Triggered when a connection attempt is resolved. + private synchronized void channelConnectComplete(final ChannelFuture cf) { + LOG.debug("Promise {} connection resolved", this); + checkState(pending.equals(cf)); + + /* + * The promise we gave out could have been cancelled, + * which cascades to the connect getting cancelled, + * but there is a slight race window, where the connect + * is already resolved, but the listener has not yet + * been notified -- cancellation at that point won't + * stop the notification arriving, so we have to close + * the race here. + */ + if (isCancelled()) { + if (cf.isSuccess()) { + LOG.debug("Closing channel for cancelled promise {}", this); + cf.channel().close(); + } + return; + } - LOG.debug("Promise {} connection resolved", NetconfSessionPromise.this); + if (cf.isSuccess()) { + LOG.debug("Promise {} connection successful", this); + return; + } + + LOG.debug("Attempt to connect to {} failed", address, cf.cause()); - // Triggered when a connection attempt is resolved. - Preconditions.checkState(NetconfSessionPromise.this.pending.equals(cf)); + final Future rf = strategy.scheduleReconnect(cf.cause()); + rf.addListener(new ReconnectingStrategyListener()); + pending = rf; + } + + private class ReconnectingStrategyListener implements FutureListener { + @Override + public void operationComplete(final Future sf) { + synchronized (NetconfSessionPromise.this) { + // Triggered when a connection attempt is to be made. + checkState(pending.equals(sf)); /* * The promise we gave out could have been cancelled, - * which cascades to the connect getting cancelled, - * but there is a slight race window, where the connect - * is already resolved, but the listener has not yet - * been notified -- cancellation at that point won't - * stop the notification arriving, so we have to close - * the race here. + * which cascades to the reconnect attempt getting + * cancelled, but there is a slight race window, where + * the reconnect attempt is already enqueued, but the + * listener has not yet been notified -- if cancellation + * happens at that point, we need to catch it here. */ - if (isCancelled()) { - if (cf.isSuccess()) { - LOG.debug("Closing channel for cancelled promise {}", NetconfSessionPromise.this); - cf.channel().close(); - } - return; - } - - if (cf.isSuccess()) { - LOG.debug("Promise {} connection successful", NetconfSessionPromise.this); - return; - } - - LOG.debug("Attempt to connect to {} failed", NetconfSessionPromise.this.address, cf.cause()); - - final Future rf = NetconfSessionPromise.this.strategy.scheduleReconnect(cf.cause()); - rf.addListener(new ReconnectingStrategyListener()); - NetconfSessionPromise.this.pending = rf; - } - } - - private class ReconnectingStrategyListener implements FutureListener { - @Override - public void operationComplete(final Future sf) { - synchronized (NetconfSessionPromise.this) { - // Triggered when a connection attempt is to be made. - Preconditions.checkState(NetconfSessionPromise.this.pending.equals(sf)); - - /* - * The promise we gave out could have been cancelled, - * which cascades to the reconnect attempt getting - * cancelled, but there is a slight race window, where - * the reconnect attempt is already enqueued, but the - * listener has not yet been notified -- if cancellation - * happens at that point, we need to catch it here. - */ - if (!isCancelled()) { - if (sf.isSuccess()) { - connect(); - } else { - setFailure(sf.cause()); - } + if (!isCancelled()) { + if (sf.isSuccess()) { + connect(); + } else { + setFailure(sf.cause()); } } }