X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fcommons%2Fprotocol-framework%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fframework%2FProtocolSessionPromise.java;h=494ccf8ec4cb5d305dad89d9cf54ed41ab3f3263;hp=c54bf84987433f20b8fbadc28e70f12a236bc02c;hb=6ca44d2095f0887508dd32f0174058a627eff4f9;hpb=b369c0185a99952c3609aad40ebeba3e4be8c5b2 diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java index c54bf84987..494ccf8ec4 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java @@ -7,6 +7,7 @@ */ package org.opendaylight.protocol.framework; +import com.google.common.base.Preconditions; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -16,22 +17,18 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; - import java.net.InetSocketAddress; - import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - +@Deprecated @ThreadSafe final class ProtocolSessionPromise> extends DefaultPromise { private static final Logger LOG = LoggerFactory.getLogger(ProtocolSessionPromise.class); private final ReconnectStrategy strategy; - private final InetSocketAddress address; + private InetSocketAddress address; private final Bootstrap b; @GuardedBy("this") @@ -53,72 +50,16 @@ final class ProtocolSessionPromise> extends Default LOG.debug("Promise {} attempting connect for {}ms", lock, timeout); + if(this.address.isUnresolved()) { + this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort()); + } this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); - this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture cf) throws Exception { - synchronized (lock) { - - LOG.debug("Promise {} connection resolved", lock); - - // Triggered when a connection attempt is resolved. - Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf)); - - /* - * The promise we gave out could have been cancelled, - * which cascades to the connect getting cancelled, - * but there is a slight race window, where the connect - * is already resolved, but the listener has not yet - * been notified -- cancellation at that point won't - * stop the notification arriving, so we have to close - * the race here. - */ - if (isCancelled()) { - if (cf.isSuccess()) { - LOG.debug("Closing channel for cancelled promise {}", lock); - cf.channel().close(); - } - return; - } - - if (!cf.isSuccess()) { - LOG.info("Attempt to connect to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause()); - final Future rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause()); - rf.addListener(new FutureListener() { - @Override - public void operationComplete(final Future sf) { - synchronized (lock) { - // Triggered when a connection attempt is to be made. - Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf)); - - /* - * The promise we gave out could have been cancelled, - * which cascades to the reconnect attempt getting - * cancelled, but there is a slight race window, where - * the reconnect attempt is already enqueued, but the - * listener has not yet been notified -- if cancellation - * happens at that point, we need to catch it here. - */ - if (!isCancelled()) { - if (sf.isSuccess()) { - connect(); - } else { - setFailure(sf.cause()); - } - } - } - } - }); - - ProtocolSessionPromise.this.pending = rf; - } else { - LOG.debug("Promise {} connection successful", lock); - } - } - } - }); + final ChannelFuture connectFuture = this.b.connect(this.address); + // Add listener that attempts reconnect by invoking this method again. + connectFuture.addListener(new BootstrapConnectListener(lock)); + this.pending = connectFuture; } catch (final Exception e) { - LOG.info("Failed to connect to {}", e); + LOG.info("Failed to connect to {}", address, e); setFailure(e); } } @@ -139,4 +80,79 @@ final class ProtocolSessionPromise> extends Default this.strategy.reconnectSuccessful(); return super.setSuccess(result); } + + private class BootstrapConnectListener implements ChannelFutureListener { + private final Object lock; + + public BootstrapConnectListener(final Object lock) { + this.lock = lock; + } + + @Override + public void operationComplete(final ChannelFuture cf) throws Exception { + synchronized (lock) { + + LOG.debug("Promise {} connection resolved", lock); + + // Triggered when a connection attempt is resolved. + Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf)); + + /* + * The promise we gave out could have been cancelled, + * which cascades to the connect getting cancelled, + * but there is a slight race window, where the connect + * is already resolved, but the listener has not yet + * been notified -- cancellation at that point won't + * stop the notification arriving, so we have to close + * the race here. + */ + if (isCancelled()) { + if (cf.isSuccess()) { + LOG.debug("Closing channel for cancelled promise {}", lock); + cf.channel().close(); + } + return; + } + + if(cf.isSuccess()) { + LOG.debug("Promise {} connection successful", lock); + return; + } + + LOG.debug("Attempt to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause()); + + final Future rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause()); + rf.addListener(new ReconnectingStrategyListener()); + ProtocolSessionPromise.this.pending = rf; + } + } + + private class ReconnectingStrategyListener implements FutureListener { + @Override + public void operationComplete(final Future sf) { + synchronized (lock) { + // Triggered when a connection attempt is to be made. + Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf)); + + /* + * The promise we gave out could have been cancelled, + * which cascades to the reconnect attempt getting + * cancelled, but there is a slight race window, where + * the reconnect attempt is already enqueued, but the + * listener has not yet been notified -- if cancellation + * happens at that point, we need to catch it here. + */ + if (!isCancelled()) { + if (sf.isSuccess()) { + connect(); + } else { + setFailure(sf.cause()); + } + } + } + } + } + + } + }