X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fcommons%2Fprotocol-framework%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fframework%2FReconnectPromise.java;h=ddf5d438f3b3f74d1e915c76be78e2fd85b4fbc8;hb=refs%2Fchanges%2F12%2F56912%2F2;hp=1fa6a8175352617aa8f87a56e90f6d391bdef029;hpb=b369c0185a99952c3609aad40ebeba3e4be8c5b2;p=controller.git diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java index 1fa6a81753..ddf5d438f3 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java @@ -7,176 +7,112 @@ */ package org.opendaylight.protocol.framework; -import io.netty.channel.ChannelFuture; +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; - -import java.io.Closeable; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer; - -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +@Deprecated final class ReconnectPromise, L extends SessionListener> extends DefaultPromise { + private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class); + private final AbstractDispatcher dispatcher; private final InetSocketAddress address; private final ReconnectStrategyFactory strategyFactory; - private final ReconnectStrategy strategy; - private final PipelineInitializer initializer; + private final Bootstrap b; + private final AbstractDispatcher.PipelineInitializer initializer; private Future pending; - private final AtomicBoolean negotiationFinished = new AtomicBoolean(false); - public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher dispatcher, final InetSocketAddress address, - final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy, - final PipelineInitializer initializer) { + final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b, final AbstractDispatcher.PipelineInitializer initializer) { super(executor); + this.b = b; + this.initializer = Preconditions.checkNotNull(initializer); this.dispatcher = Preconditions.checkNotNull(dispatcher); this.address = Preconditions.checkNotNull(address); this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory); - this.strategy = Preconditions.checkNotNull(reestablishStrategy); - this.initializer = Preconditions.checkNotNull(initializer); } - // FIXME: BUG-190: refactor - synchronized void connect() { - negotiationFinished.set(false); - final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy(); - final ReconnectStrategy rs = new ReconnectStrategy() { - @Override - public Future scheduleReconnect(final Throwable cause) { - return cs.scheduleReconnect(cause); - } - @Override - public void reconnectSuccessful() { - cs.reconnectSuccessful(); - } - - @Override - public int getConnectTimeout() throws Exception { - final int cst = cs.getConnectTimeout(); - final int rst = ReconnectPromise.this.strategy.getConnectTimeout(); - - if (cst == 0) { - return rst; - } - if (rst == 0) { - return cst; - } - return Math.min(cst, rst); - } - }; - - final Future cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer() { + // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts + pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer() { @Override public void initializeChannel(final SocketChannel channel, final Promise promise) { - addChannelClosedListener(channel.closeFuture()); initializer.initializeChannel(channel, promise); + // add closed channel handler + // This handler has to be added as last channel handler and the channel inactive event has to be caught by it + // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work + // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started + channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this)); } }); - final Object lock = this; - this.pending = cf; - - cf.addListener(new FutureListener() { - + pending.addListener(new GenericFutureListener>() { @Override - public void operationComplete(final Future future) { - synchronized (lock) { - if (!future.isSuccess()) { - final Future rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause()); - - if(rf == null) { - // This should reflect: no more reconnecting strategies, enough - // Currently all reconnect strategies fail with exception, should return null - return; - } - - ReconnectPromise.this.pending = rf; - - rf.addListener(new FutureListener() { - @Override - public void operationComplete(final Future sf) { - synchronized (lock) { - /* - * The promise we gave out could have been cancelled, - * which cascades to the reconnect attempt getting - * cancelled, but there is a slight race window, where - * the reconnect attempt is already enqueued, but the - * listener has not yet been notified -- if cancellation - * happens at that point, we need to catch it here. - */ - if (!isCancelled()) { - if (sf.isSuccess()) { - connect(); - } else { - setFailure(sf.cause()); - } - } - } - } - }); - } else { - /* - * FIXME: BUG-190: we have a slight race window with cancellation - * here. Analyze and define its semantics. - */ - ReconnectPromise.this.strategy.reconnectSuccessful(); - negotiationFinished.set(true); - } + public void operationComplete(final Future future) throws Exception { + if (!future.isSuccess()) { + ReconnectPromise.this.setFailure(future.cause()); } } }); } - private final ClosedChannelListener closedChannelListener = new ClosedChannelListener(); + /** + * + * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed + */ + private boolean isInitialConnectFinished() { + Preconditions.checkNotNull(pending); + return pending.isDone() && pending.isSuccess(); + } + + @Override + public synchronized boolean cancel(final boolean mayInterruptIfRunning) { + if (super.cancel(mayInterruptIfRunning)) { + Preconditions.checkNotNull(pending); + this.pending.cancel(mayInterruptIfRunning); + return true; + } - class ClosedChannelListener implements Closeable, FutureListener { + return false; + } + + /** + * Channel handler that responds to channelInactive event and reconnects the session. + * Only if the promise was not canceled. + */ + private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter { + private final ReconnectPromise promise; - private final AtomicBoolean stop = new AtomicBoolean(false); + public ClosedChannelHandler(final ReconnectPromise promise) { + this.promise = promise; + } @Override - public void operationComplete(final Future future) throws Exception { - if (stop.get()) { + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + // This is the ultimate channel inactive handler, not forwarding + if (promise.isCancelled()) { return; } - // Start reconnecting crashed session after negotiation was successful - if (!negotiationFinished.get()) { - return; + if (promise.isInitialConnectFinished() == false) { + LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address); } - connect(); - } - - @Override - public void close() { - this.stop.set(true); + LOG.debug("Reconnecting after connection to {} was dropped", promise.address); + promise.connect(); } } - private void addChannelClosedListener(final ChannelFuture channelFuture) { - channelFuture.addListener(closedChannelListener); - } - - @Override - public synchronized boolean cancel(final boolean mayInterruptIfRunning) { - closedChannelListener.close(); - - if (super.cancel(mayInterruptIfRunning)) { - this.pending.cancel(mayInterruptIfRunning); - return true; - } - - return false; - } }