*/
package org.opendaylight.protocol.framework;
-import io.netty.channel.ChannelFuture;
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
-
-import java.io.Closeable;
import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);
+
private final AbstractDispatcher<S, L> dispatcher;
private final InetSocketAddress address;
private final ReconnectStrategyFactory strategyFactory;
- private final ReconnectStrategy strategy;
- private final PipelineInitializer<S> initializer;
+ private final Bootstrap b;
+ private final AbstractDispatcher.PipelineInitializer<S> initializer;
private Future<?> pending;
- private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
-
public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
- final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
- final PipelineInitializer<S> initializer) {
+ final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b, final AbstractDispatcher.PipelineInitializer<S> initializer) {
super(executor);
+ this.b = b;
+ this.initializer = Preconditions.checkNotNull(initializer);
this.dispatcher = Preconditions.checkNotNull(dispatcher);
this.address = Preconditions.checkNotNull(address);
this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
- this.strategy = Preconditions.checkNotNull(reestablishStrategy);
- this.initializer = Preconditions.checkNotNull(initializer);
}
- // FIXME: BUG-190: refactor
-
synchronized void connect() {
- negotiationFinished.set(false);
-
final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
- final ReconnectStrategy rs = new ReconnectStrategy() {
- @Override
- public Future<Void> scheduleReconnect(final Throwable cause) {
- return cs.scheduleReconnect(cause);
- }
- @Override
- public void reconnectSuccessful() {
- cs.reconnectSuccessful();
- }
-
- @Override
- public int getConnectTimeout() throws Exception {
- final int cst = cs.getConnectTimeout();
- final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
-
- if (cst == 0) {
- return rst;
- }
- if (rst == 0) {
- return cst;
- }
- return Math.min(cst, rst);
- }
- };
-
- final Future<S> cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer<S>() {
+ // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
+ pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
@Override
public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
- addChannelClosedListener(channel.closeFuture());
initializer.initializeChannel(channel, promise);
+
+ // add closed channel handler
+ channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this));
}
});
+ }
- final Object lock = this;
- this.pending = cf;
+ /**
+ *
+ * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
+ */
+ private boolean isInitialConnectFinished() {
+ Preconditions.checkNotNull(pending);
+ return pending.isDone() && pending.isSuccess();
+ }
- cf.addListener(new FutureListener<S>() {
+ @Override
+ public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ if (super.cancel(mayInterruptIfRunning)) {
+ Preconditions.checkNotNull(pending);
+ this.pending.cancel(mayInterruptIfRunning);
+ return true;
+ }
- @Override
- public void operationComplete(final Future<S> future) {
- synchronized (lock) {
- if (!future.isSuccess()) {
- final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
-
- if(rf == null) {
- // This should reflect: no more reconnecting strategies, enough
- // Currently all reconnect strategies fail with exception, should return null
- return;
- }
-
- ReconnectPromise.this.pending = rf;
-
- rf.addListener(new FutureListener<Void>() {
- @Override
- public void operationComplete(final Future<Void> sf) {
- synchronized (lock) {
- /*
- * The promise we gave out could have been cancelled,
- * which cascades to the reconnect attempt getting
- * cancelled, but there is a slight race window, where
- * the reconnect attempt is already enqueued, but the
- * listener has not yet been notified -- if cancellation
- * happens at that point, we need to catch it here.
- */
- if (!isCancelled()) {
- if (sf.isSuccess()) {
- connect();
- } else {
- setFailure(sf.cause());
- }
- }
- }
- }
- });
- } else {
- /*
- * FIXME: BUG-190: we have a slight race window with cancellation
- * here. Analyze and define its semantics.
- */
- ReconnectPromise.this.strategy.reconnectSuccessful();
- negotiationFinished.set(true);
- }
- }
- }
- });
+ return false;
}
- private final ClosedChannelListener closedChannelListener = new ClosedChannelListener();
-
- class ClosedChannelListener implements Closeable, FutureListener<Void> {
+ /**
+ * Channel handler that responds to channelInactive event and reconnects the session.
+ * Only if the initial connection was successfully established and promise was not canceled.
+ */
+ private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
+ private final ReconnectPromise<?, ?> promise;
- private final AtomicBoolean stop = new AtomicBoolean(false);
+ public ClosedChannelHandler(final ReconnectPromise<?, ?> promise) {
+ this.promise = promise;
+ }
@Override
- public void operationComplete(final Future<Void> future) throws Exception {
- if (stop.get()) {
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ if (promise.isCancelled()) {
return;
}
- // Start reconnecting crashed session after negotiation was successful
- if (!negotiationFinished.get()) {
+ // Check if initial connection was fully finished. If the session was dropped during negotiation, reconnect will not happen.
+ // Session can be dropped during negotiation on purpose by the client side and would make no sense to initiate reconnect
+ if (promise.isInitialConnectFinished() == false) {
return;
}
- connect();
- }
-
- @Override
- public void close() {
- this.stop.set(true);
+ LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
+ promise.connect();
}
}
- private void addChannelClosedListener(final ChannelFuture channelFuture) {
- channelFuture.addListener(closedChannelListener);
- }
-
- @Override
- public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
- closedChannelListener.close();
-
- if (super.cancel(mayInterruptIfRunning)) {
- this.pending.cancel(mayInterruptIfRunning);
- return true;
- }
-
- return false;
- }
}