Bug-6781: Inbound and outbound connection attempts from controller are not synchronized
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / protocol / BGPProtocolSessionPromise.java
index b3e5bba736f7cad8280f3046cdbda8f3ebb02c32..01718ebf9dcce688df9737c2fb4672234dee52e2 100644 (file)
@@ -18,8 +18,13 @@ import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.protocol.bgp.rib.impl.StrictBGPPeerRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistrySessionListener;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,19 +35,37 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
     private InetSocketAddress address;
     private final int retryTimer;
     private final Bootstrap bootstrap;
+    private final BGPPeerRegistry peerRegistry;
+    private final AutoCloseable listenerRegistration;
     @GuardedBy("this")
     private ChannelFuture pending;
+    @GuardedBy("this")
+    private boolean peerSessionPresent;
+    @GuardedBy("this")
+    private boolean connectSkipped;
+
 
-    public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap) {
+    public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap, BGPPeerRegistry peerRegistry) {
         super(GlobalEventExecutor.INSTANCE);
         this.address = Preconditions.checkNotNull(remoteAddress);
         this.retryTimer = retryTimer;
         this.bootstrap = Preconditions.checkNotNull(bootstrap);
+        this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
+        this.listenerRegistration = this.peerRegistry
+                .registerPeerSessionListener(new BGPProtocolSessionPromise.PeerRegistrySessionListenerImpl(this,
+                        StrictBGPPeerRegistry.getIpAddress(this.address)));
     }
 
     public synchronized void connect() {
-        final BGPProtocolSessionPromise lock = this;
+        if (this.peerSessionPresent) {
+            LOG.debug("Connection to {} already exists", this.address);
+            this.connectSkipped = true;
+            return;
+        } else {
+            this.connectSkipped = false;
+        }
 
+        final BGPProtocolSessionPromise lock = this;
         try {
             LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(CONNECT_TIMEOUT));
             if (this.address.isUnresolved()) {
@@ -72,6 +95,13 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
         loop.schedule(new Runnable() {
             @Override
             public void run() {
+                if (BGPProtocolSessionPromise.this.peerSessionPresent) {
+                    LOG.debug("Connection to {} already exists", BGPProtocolSessionPromise.this.address);
+                    BGPProtocolSessionPromise.this.connectSkipped = true;
+                    return;
+                } else {
+                    BGPProtocolSessionPromise.this.connectSkipped = false;
+                }
                 LOG.debug("Attempting to connect to {}", BGPProtocolSessionPromise.this.address);
                 final ChannelFuture reconnectFuture = BGPProtocolSessionPromise.this.bootstrap.connect();
                 reconnectFuture.addListener(new BootstrapConnectListener(lock));
@@ -83,7 +113,9 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
 
     @Override
     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        closePeerSessionListener();
         if (super.cancel(mayInterruptIfRunning)) {
+            Preconditions.checkNotNull(this.pending);
             this.pending.cancel(mayInterruptIfRunning);
             return true;
         } else {
@@ -91,6 +123,14 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
         }
     }
 
+    private void closePeerSessionListener() {
+        try {
+            this.listenerRegistration.close();
+        } catch (final Exception e) {
+            LOG.debug("Exception encountered while closing peer registry session listener registration", e);
+        }
+    }
+
     @Override
     public synchronized Promise<S> setSuccess(final S result) {
         LOG.debug("Promise {} completed", this);
@@ -123,4 +163,40 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
             }
         }
     }
-}
\ No newline at end of file
+
+    private class PeerRegistrySessionListenerImpl implements PeerRegistrySessionListener {
+        private final Object lock;
+        private final IpAddress peerAddress;
+
+        PeerRegistrySessionListenerImpl(final Object lock, final IpAddress peerAddress) {
+            this.lock = lock;
+            this.peerAddress = peerAddress;
+        }
+
+        @Override
+        public void onSessionCreated(@Nonnull final IpAddress ip) {
+            if (!ip.equals(this.peerAddress)) {
+                return;
+            }
+            BGPProtocolSessionPromise.LOG.debug("Callback for session creation with peer {} received", ip);
+            synchronized (this.lock) {
+                BGPProtocolSessionPromise.this.peerSessionPresent = true;
+            }
+        }
+
+        @Override
+        public void onSessionRemoved(@Nonnull final IpAddress ip) {
+            if (!ip.equals(this.peerAddress)) {
+                return;
+            }
+            BGPProtocolSessionPromise.LOG.debug("Callback for session removal with peer {} received", ip);
+            synchronized (this.lock) {
+                BGPProtocolSessionPromise.this.peerSessionPresent = false;
+                if (BGPProtocolSessionPromise.this.connectSkipped) {
+                    BGPProtocolSessionPromise.this.connect();
+                }
+            }
+        }
+    }
+
+}