Bug-6781: Inbound and outbound connection attempts from controller are not synchronized
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / protocol / BGPReconnectPromise.java
index 4123940afbe33b0a200757b1ca64ce4b65aea089..e69f97bf4c9fb42fd40b17fa19465569a91c1840 100644 (file)
@@ -20,6 +20,7 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
 import org.slf4j.Logger;
@@ -31,22 +32,29 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     private final InetSocketAddress address;
     private final int retryTimer;
     private final Bootstrap bootstrap;
+    private final BGPPeerRegistry peerRegistry;
     private final ChannelPipelineInitializer initializer;
     private BGPProtocolSessionPromise<S> pending;
 
     public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
                                final int retryTimer, final Bootstrap bootstrap,
+                               final BGPPeerRegistry peerRegistry,
                                final ChannelPipelineInitializer initializer) {
         super(executor);
         this.bootstrap = bootstrap;
         this.initializer = Preconditions.checkNotNull(initializer);
         this.address = Preconditions.checkNotNull(address);
         this.retryTimer = retryTimer;
+        this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
     }
 
     public synchronized void connect() {
+        if (this.pending != null) {
+            this.pending.cancel(true);
+        }
+
         // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
-        this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, new ChannelPipelineInitializer<S>() {
+        this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, new ChannelPipelineInitializer<S>() {
             @Override
             public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
                 BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
@@ -61,7 +69,7 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
         this.pending.addListener(new GenericFutureListener<Future<Object>>() {
             @Override
             public void operationComplete(final Future<Object> future) throws Exception {
-                if (!future.isSuccess()) {
+                if (!future.isSuccess() && !BGPReconnectPromise.this.isDone()) {
                     BGPReconnectPromise.this.setFailure(future.cause());
                 }
             }
@@ -69,8 +77,8 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     }
 
     public BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
-                                  final ChannelPipelineInitializer initializer) {
-        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap);
+            final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer initializer) {
+        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap, peerRegistry);
         final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(final SocketChannel channel) {
@@ -92,6 +100,11 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
         return this.pending.isDone() && this.pending.isSuccess();
     }
 
+    private void reconnect() {
+        Preconditions.checkNotNull(this.pending);
+        this.pending.reconnect();
+    }
+
     @Override
     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
         if (super.cancel(mayInterruptIfRunning)) {
@@ -122,7 +135,7 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
 
             if (!this.promise.isInitialConnectFinished()) {
                 LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
-                this.promise.pending.reconnect();
+                this.promise.reconnect();
                 return;
             }