Move protocol framework from BGPCEP project
[controller.git] / opendaylight / commons / protocol-framework / src / main / java / org / opendaylight / protocol / framework / ReconnectPromise.java
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
new file mode 100644 (file)
index 0000000..1fa6a81
--- /dev/null
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
+
+import com.google.common.base.Preconditions;
+
+final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
+    private final AbstractDispatcher<S, L> dispatcher;
+    private final InetSocketAddress address;
+    private final ReconnectStrategyFactory strategyFactory;
+    private final ReconnectStrategy strategy;
+    private final PipelineInitializer<S> initializer;
+    private Future<?> pending;
+
+    private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
+
+    public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
+            final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
+            final PipelineInitializer<S> initializer) {
+        super(executor);
+        this.dispatcher = Preconditions.checkNotNull(dispatcher);
+        this.address = Preconditions.checkNotNull(address);
+        this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
+        this.strategy = Preconditions.checkNotNull(reestablishStrategy);
+        this.initializer = Preconditions.checkNotNull(initializer);
+    }
+
+    // FIXME: BUG-190: refactor
+
+    synchronized void connect() {
+        negotiationFinished.set(false);
+
+        final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
+        final ReconnectStrategy rs = new ReconnectStrategy() {
+            @Override
+            public Future<Void> scheduleReconnect(final Throwable cause) {
+                return cs.scheduleReconnect(cause);
+            }
+
+            @Override
+            public void reconnectSuccessful() {
+                cs.reconnectSuccessful();
+            }
+
+            @Override
+            public int getConnectTimeout() throws Exception {
+                final int cst = cs.getConnectTimeout();
+                final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
+
+                if (cst == 0) {
+                    return rst;
+                }
+                if (rst == 0) {
+                    return cst;
+                }
+                return Math.min(cst, rst);
+            }
+        };
+
+        final Future<S> cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer<S>() {
+            @Override
+            public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
+                addChannelClosedListener(channel.closeFuture());
+                initializer.initializeChannel(channel, promise);
+            }
+        });
+
+        final Object lock = this;
+        this.pending = cf;
+
+        cf.addListener(new FutureListener<S>() {
+
+            @Override
+            public void operationComplete(final Future<S> future) {
+                synchronized (lock) {
+                    if (!future.isSuccess()) {
+                        final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
+
+                        if(rf == null) {
+                            // This should reflect: no more reconnecting strategies, enough
+                            // Currently all reconnect strategies fail with exception, should return null
+                            return;
+                        }
+
+                        ReconnectPromise.this.pending = rf;
+
+                        rf.addListener(new FutureListener<Void>() {
+                            @Override
+                            public void operationComplete(final Future<Void> sf) {
+                                synchronized (lock) {
+                                    /*
+                                     * The promise we gave out could have been cancelled,
+                                     * which cascades to the reconnect attempt getting
+                                     * cancelled, but there is a slight race window, where
+                                     * the reconnect attempt is already enqueued, but the
+                                     * listener has not yet been notified -- if cancellation
+                                     * happens at that point, we need to catch it here.
+                                     */
+                                    if (!isCancelled()) {
+                                        if (sf.isSuccess()) {
+                                            connect();
+                                        } else {
+                                            setFailure(sf.cause());
+                                        }
+                                    }
+                                }
+                            }
+                        });
+                    } else {
+                        /*
+                         *  FIXME: BUG-190: we have a slight race window with cancellation
+                         *         here. Analyze and define its semantics.
+                         */
+                        ReconnectPromise.this.strategy.reconnectSuccessful();
+                        negotiationFinished.set(true);
+                    }
+                }
+            }
+        });
+    }
+
+    private final ClosedChannelListener closedChannelListener = new ClosedChannelListener();
+
+    class ClosedChannelListener implements Closeable, FutureListener<Void> {
+
+        private final AtomicBoolean stop = new AtomicBoolean(false);
+
+        @Override
+        public void operationComplete(final Future<Void> future) throws Exception {
+            if (stop.get()) {
+                return;
+            }
+
+            // Start reconnecting crashed session after negotiation was successful
+            if (!negotiationFinished.get()) {
+                return;
+            }
+
+            connect();
+        }
+
+        @Override
+        public void close() {
+            this.stop.set(true);
+        }
+    }
+
+    private void addChannelClosedListener(final ChannelFuture channelFuture) {
+        channelFuture.addListener(closedChannelListener);
+    }
+
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        closedChannelListener.close();
+
+        if (super.cancel(mayInterruptIfRunning)) {
+            this.pending.cancel(mayInterruptIfRunning);
+            return true;
+        }
+
+        return false;
+    }
+}