Merge "Fix channelInactive event handling in the netty pipeline for netconf."
[controller.git] / opendaylight / commons / protocol-framework / src / main / java / org / opendaylight / protocol / framework / ReconnectPromise.java
index 1fa6a8175352617aa8f87a56e90f6d391bdef029..aaec95a74b5886150c8d0bc4bb7d11a1a1570dbf 100644 (file)
  */
 package org.opendaylight.protocol.framework;
 
-import io.netty.channel.ChannelFuture;
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
-
-import java.io.Closeable;
 import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);
+
     private final AbstractDispatcher<S, L> dispatcher;
     private final InetSocketAddress address;
     private final ReconnectStrategyFactory strategyFactory;
-    private final ReconnectStrategy strategy;
-    private final PipelineInitializer<S> initializer;
+    private final Bootstrap b;
+    private final AbstractDispatcher.PipelineInitializer<S> initializer;
     private Future<?> pending;
 
-    private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
-
     public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
-            final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
-            final PipelineInitializer<S> initializer) {
+                            final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b, final AbstractDispatcher.PipelineInitializer<S> initializer) {
         super(executor);
+        this.b = b;
+        this.initializer = Preconditions.checkNotNull(initializer);
         this.dispatcher = Preconditions.checkNotNull(dispatcher);
         this.address = Preconditions.checkNotNull(address);
         this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
-        this.strategy = Preconditions.checkNotNull(reestablishStrategy);
-        this.initializer = Preconditions.checkNotNull(initializer);
     }
 
-    // FIXME: BUG-190: refactor
-
     synchronized void connect() {
-        negotiationFinished.set(false);
-
         final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
-        final ReconnectStrategy rs = new ReconnectStrategy() {
-            @Override
-            public Future<Void> scheduleReconnect(final Throwable cause) {
-                return cs.scheduleReconnect(cause);
-            }
 
-            @Override
-            public void reconnectSuccessful() {
-                cs.reconnectSuccessful();
-            }
-
-            @Override
-            public int getConnectTimeout() throws Exception {
-                final int cst = cs.getConnectTimeout();
-                final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
-
-                if (cst == 0) {
-                    return rst;
-                }
-                if (rst == 0) {
-                    return cst;
-                }
-                return Math.min(cst, rst);
-            }
-        };
-
-        final Future<S> cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer<S>() {
+        // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
+        pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
             @Override
             public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
-                addChannelClosedListener(channel.closeFuture());
                 initializer.initializeChannel(channel, promise);
+                // add closed channel handler
+                // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+                // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+                // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+                channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
             }
         });
+    }
 
-        final Object lock = this;
-        this.pending = cf;
-
-        cf.addListener(new FutureListener<S>() {
-
-            @Override
-            public void operationComplete(final Future<S> future) {
-                synchronized (lock) {
-                    if (!future.isSuccess()) {
-                        final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
-
-                        if(rf == null) {
-                            // This should reflect: no more reconnecting strategies, enough
-                            // Currently all reconnect strategies fail with exception, should return null
-                            return;
-                        }
+    /**
+     *
+     * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
+     */
+    private boolean isInitialConnectFinished() {
+        Preconditions.checkNotNull(pending);
+        return pending.isDone() && pending.isSuccess();
+    }
 
-                        ReconnectPromise.this.pending = rf;
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            Preconditions.checkNotNull(pending);
+            this.pending.cancel(mayInterruptIfRunning);
+            return true;
+        }
 
-                        rf.addListener(new FutureListener<Void>() {
-                            @Override
-                            public void operationComplete(final Future<Void> sf) {
-                                synchronized (lock) {
-                                    /*
-                                     * The promise we gave out could have been cancelled,
-                                     * which cascades to the reconnect attempt getting
-                                     * cancelled, but there is a slight race window, where
-                                     * the reconnect attempt is already enqueued, but the
-                                     * listener has not yet been notified -- if cancellation
-                                     * happens at that point, we need to catch it here.
-                                     */
-                                    if (!isCancelled()) {
-                                        if (sf.isSuccess()) {
-                                            connect();
-                                        } else {
-                                            setFailure(sf.cause());
-                                        }
-                                    }
-                                }
-                            }
-                        });
-                    } else {
-                        /*
-                         *  FIXME: BUG-190: we have a slight race window with cancellation
-                         *         here. Analyze and define its semantics.
-                         */
-                        ReconnectPromise.this.strategy.reconnectSuccessful();
-                        negotiationFinished.set(true);
-                    }
-                }
-            }
-        });
+        return false;
     }
 
-    private final ClosedChannelListener closedChannelListener = new ClosedChannelListener();
-
-    class ClosedChannelListener implements Closeable, FutureListener<Void> {
+    /**
+     * Channel handler that responds to channelInactive event and reconnects the session.
+     * Only if the promise was not canceled.
+     */
+    private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
+        private final ReconnectPromise<?, ?> promise;
 
-        private final AtomicBoolean stop = new AtomicBoolean(false);
+        public ClosedChannelHandler(final ReconnectPromise<?, ?> promise) {
+            this.promise = promise;
+        }
 
         @Override
-        public void operationComplete(final Future<Void> future) throws Exception {
-            if (stop.get()) {
+        public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+            // This is the ultimate channel inactive handler, not forwarding
+            if (promise.isCancelled()) {
                 return;
             }
 
-            // Start reconnecting crashed session after negotiation was successful
-            if (!negotiationFinished.get()) {
-                return;
+            if (promise.isInitialConnectFinished() == false) {
+                LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
             }
 
-            connect();
-        }
-
-        @Override
-        public void close() {
-            this.stop.set(true);
+            LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
+            promise.connect();
         }
     }
 
-    private void addChannelClosedListener(final ChannelFuture channelFuture) {
-        channelFuture.addListener(closedChannelListener);
-    }
-
-    @Override
-    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
-        closedChannelListener.close();
-
-        if (super.cancel(mayInterruptIfRunning)) {
-            this.pending.cancel(mayInterruptIfRunning);
-            return true;
-        }
-
-        return false;
-    }
 }