Fix reconnect promise failing to restart during negotiation
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / ReconnectPromise.java
index 00c80d47321d6d43d3675573bdc6274f06a3d623..193c209900f67b1971a34cf3c202b5b2e8080f61 100644 (file)
@@ -7,11 +7,16 @@
  */
 package org.opendaylight.protocol.framework;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
 
+import java.io.Closeable;
 import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
 
@@ -25,6 +30,8 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
        private final PipelineInitializer<S> initializer;
        private Future<?> pending;
 
+       private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
+
        public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
                        final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
                        final PipelineInitializer<S> initializer) {
@@ -36,7 +43,11 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
                this.initializer = Preconditions.checkNotNull(initializer);
        }
 
+       // TODO rafactor
+
        synchronized void connect() {
+               negotiationFinished.set(false);
+
                final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
                final ReconnectStrategy rs = new ReconnectStrategy() {
                        @Override
@@ -64,17 +75,31 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
                        }
                };
 
-               final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.initializer);
+               final Future<S> cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer<S>() {
+                       @Override
+                       public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
+                               addChannelClosedListener(channel.closeFuture());
+                               initializer.initializeChannel(channel, promise);
+                       }
+               });
 
                final Object lock = this;
                this.pending = cf;
 
                cf.addListener(new FutureListener<S>() {
+
                        @Override
                        public void operationComplete(final Future<S> future) {
                                synchronized (lock) {
                                        if (!future.isSuccess()) {
                                                final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
+
+                                               if(rf == null) {
+                                                       // This should reflect: no more reconnecting strategies, enough
+                                                       // Currently all reconnect strategies fail with exception, should return null
+                                                       return;
+                                               }
+
                                                ReconnectPromise.this.pending = rf;
 
                                                rf.addListener(new FutureListener<Void>() {
@@ -105,15 +130,47 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
                                                 *         here. Analyze and define its semantics.
                                                 */
                                                ReconnectPromise.this.strategy.reconnectSuccessful();
-                                               setSuccess(null);
+                                               negotiationFinished.set(true);
                                        }
                                }
                        }
                });
        }
 
+       private final ClosedChannelListener closedChannelListener = new ClosedChannelListener();
+
+       class ClosedChannelListener implements Closeable, FutureListener<Void> {
+
+               private final AtomicBoolean stop = new AtomicBoolean(false);
+
+               @Override
+               public void operationComplete(final Future<Void> future) throws Exception {
+                       if (stop.get()) {
+                               return;
+                       }
+
+                       // Start reconnecting crashed session after negotiation was successful
+                       if (!negotiationFinished.get()) {
+                               return;
+                       }
+
+                       connect();
+               }
+
+               @Override
+               public void close() {
+                       this.stop.set(true);
+               }
+       }
+
+       private void addChannelClosedListener(final ChannelFuture channelFuture) {
+               channelFuture.addListener(closedChannelListener);
+       }
+
        @Override
        public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+               closedChannelListener.close();
+
                if (super.cancel(mayInterruptIfRunning)) {
                        this.pending.cancel(mayInterruptIfRunning);
                        return true;