Bug-6781: Inbound and outbound connection attempts from controller are not synchronized 06/47106/3
authorAjay <ajayl.bro@gmail.com>
Wed, 19 Oct 2016 06:37:57 +0000 (06:37 +0000)
committerMilos Fabian <milfabia@cisco.com>
Sat, 22 Oct 2016 07:49:54 +0000 (07:49 +0000)
- created new peer session listener registry in BGPPeerRegistry for the
  outbound connection establishment logic to get notified when new peer
  session is created or destroyed
- updated outbound connection establishment logic to attempt a connection
  only when no existing session is present
- updated unit-tests

Change-Id: Ie6afb79e290c1a3c98d82af87febd6851ad200c2
Signed-off-by: Ajay <ajayl.bro@gmail.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java [changed mode: 0755->0644]
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/StrictBGPPeerRegistry.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPProtocolSessionPromise.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPReconnectPromise.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/spi/BGPPeerRegistry.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/spi/PeerRegistrySessionListener.java [new file with mode: 0644]
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/StrictBGPPeerRegistryTest.java

old mode 100755 (executable)
new mode 100644 (file)
index 074c31b..f48173a
@@ -77,7 +77,7 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(listener);
         final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
 
-        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(remoteAddress, retryTimer, clientBootStrap);
+        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(remoteAddress, retryTimer, clientBootStrap, listener);
         clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
         sessionPromise.connect();
         LOG.debug("Client created.");
@@ -134,7 +134,7 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(peerRegistry);
         final Bootstrap bootstrap = createClientBootStrap(keys, this.workerGroup);
         final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, remoteAddress,
-                retryTimer, bootstrap, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
+                retryTimer, bootstrap, peerRegistry, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
         reconnectPromise.connect();
         return reconnectPromise;
     }
index 3924a63de3549b64c54f13d3504c8819a8c9aa08..541da34782198d94fcf165aac09fc67091f64d50 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.protocol.bgp.parser.impl.message.open.As4CapabilityHandl
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
 import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistryListener;
+import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistrySessionListener;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
@@ -71,6 +72,8 @@ public final class StrictBGPPeerRegistry implements BGPPeerRegistry {
     private final Map<IpAddress, BGPSessionPreferences> peerPreferences = Maps.newHashMap();
     @GuardedBy("this")
     private final Set<PeerRegistryListener> listeners = new HashSet<>();
+    @GuardedBy("this")
+    private final Set<PeerRegistrySessionListener> sessionListeners = new HashSet<>();
 
     public static BGPPeerRegistry instance() {
         return GLOBAL;
@@ -104,6 +107,9 @@ public final class StrictBGPPeerRegistry implements BGPPeerRegistry {
     public synchronized void removePeerSession(final IpAddress ip) {
         Preconditions.checkNotNull(ip);
         this.sessionIds.remove(ip);
+        for (final PeerRegistrySessionListener peerRegistrySessionListener : this.sessionListeners) {
+            peerRegistrySessionListener.onSessionRemoved(ip);
+        }
     }
 
     @Override
@@ -183,6 +189,9 @@ public final class StrictBGPPeerRegistry implements BGPPeerRegistry {
 
         // Map session id to peer IP address
         this.sessionIds.put(ip, currentConnection);
+        for (final PeerRegistrySessionListener peerRegistrySessionListener : this.sessionListeners) {
+            peerRegistrySessionListener.onSessionCreated(ip);
+        }
         return p;
     }
 
@@ -352,4 +361,20 @@ public final class StrictBGPPeerRegistry implements BGPPeerRegistry {
             }
         };
     }
-}
\ No newline at end of file
+
+    @Override
+    public synchronized AutoCloseable registerPeerSessionListener(final PeerRegistrySessionListener listener) {
+        this.sessionListeners.add(listener);
+        for (final IpAddress ipAddress : this.sessionIds.keySet()) {
+            listener.onSessionCreated(ipAddress);
+        }
+        return new AbstractRegistration() {
+            @Override
+            protected void removeRegistration() {
+                synchronized (StrictBGPPeerRegistry.this) {
+                    StrictBGPPeerRegistry.this.sessionListeners.remove(listener);
+                }
+            }
+        };
+    }
+}
index b3e5bba736f7cad8280f3046cdbda8f3ebb02c32..01718ebf9dcce688df9737c2fb4672234dee52e2 100644 (file)
@@ -18,8 +18,13 @@ import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.protocol.bgp.rib.impl.StrictBGPPeerRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistrySessionListener;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,19 +35,37 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
     private InetSocketAddress address;
     private final int retryTimer;
     private final Bootstrap bootstrap;
+    private final BGPPeerRegistry peerRegistry;
+    private final AutoCloseable listenerRegistration;
     @GuardedBy("this")
     private ChannelFuture pending;
+    @GuardedBy("this")
+    private boolean peerSessionPresent;
+    @GuardedBy("this")
+    private boolean connectSkipped;
+
 
-    public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap) {
+    public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap, BGPPeerRegistry peerRegistry) {
         super(GlobalEventExecutor.INSTANCE);
         this.address = Preconditions.checkNotNull(remoteAddress);
         this.retryTimer = retryTimer;
         this.bootstrap = Preconditions.checkNotNull(bootstrap);
+        this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
+        this.listenerRegistration = this.peerRegistry
+                .registerPeerSessionListener(new BGPProtocolSessionPromise.PeerRegistrySessionListenerImpl(this,
+                        StrictBGPPeerRegistry.getIpAddress(this.address)));
     }
 
     public synchronized void connect() {
-        final BGPProtocolSessionPromise lock = this;
+        if (this.peerSessionPresent) {
+            LOG.debug("Connection to {} already exists", this.address);
+            this.connectSkipped = true;
+            return;
+        } else {
+            this.connectSkipped = false;
+        }
 
+        final BGPProtocolSessionPromise lock = this;
         try {
             LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(CONNECT_TIMEOUT));
             if (this.address.isUnresolved()) {
@@ -72,6 +95,13 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
         loop.schedule(new Runnable() {
             @Override
             public void run() {
+                if (BGPProtocolSessionPromise.this.peerSessionPresent) {
+                    LOG.debug("Connection to {} already exists", BGPProtocolSessionPromise.this.address);
+                    BGPProtocolSessionPromise.this.connectSkipped = true;
+                    return;
+                } else {
+                    BGPProtocolSessionPromise.this.connectSkipped = false;
+                }
                 LOG.debug("Attempting to connect to {}", BGPProtocolSessionPromise.this.address);
                 final ChannelFuture reconnectFuture = BGPProtocolSessionPromise.this.bootstrap.connect();
                 reconnectFuture.addListener(new BootstrapConnectListener(lock));
@@ -83,7 +113,9 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
 
     @Override
     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        closePeerSessionListener();
         if (super.cancel(mayInterruptIfRunning)) {
+            Preconditions.checkNotNull(this.pending);
             this.pending.cancel(mayInterruptIfRunning);
             return true;
         } else {
@@ -91,6 +123,14 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
         }
     }
 
+    private void closePeerSessionListener() {
+        try {
+            this.listenerRegistration.close();
+        } catch (final Exception e) {
+            LOG.debug("Exception encountered while closing peer registry session listener registration", e);
+        }
+    }
+
     @Override
     public synchronized Promise<S> setSuccess(final S result) {
         LOG.debug("Promise {} completed", this);
@@ -123,4 +163,40 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
             }
         }
     }
-}
\ No newline at end of file
+
+    private class PeerRegistrySessionListenerImpl implements PeerRegistrySessionListener {
+        private final Object lock;
+        private final IpAddress peerAddress;
+
+        PeerRegistrySessionListenerImpl(final Object lock, final IpAddress peerAddress) {
+            this.lock = lock;
+            this.peerAddress = peerAddress;
+        }
+
+        @Override
+        public void onSessionCreated(@Nonnull final IpAddress ip) {
+            if (!ip.equals(this.peerAddress)) {
+                return;
+            }
+            BGPProtocolSessionPromise.LOG.debug("Callback for session creation with peer {} received", ip);
+            synchronized (this.lock) {
+                BGPProtocolSessionPromise.this.peerSessionPresent = true;
+            }
+        }
+
+        @Override
+        public void onSessionRemoved(@Nonnull final IpAddress ip) {
+            if (!ip.equals(this.peerAddress)) {
+                return;
+            }
+            BGPProtocolSessionPromise.LOG.debug("Callback for session removal with peer {} received", ip);
+            synchronized (this.lock) {
+                BGPProtocolSessionPromise.this.peerSessionPresent = false;
+                if (BGPProtocolSessionPromise.this.connectSkipped) {
+                    BGPProtocolSessionPromise.this.connect();
+                }
+            }
+        }
+    }
+
+}
index 4123940afbe33b0a200757b1ca64ce4b65aea089..e69f97bf4c9fb42fd40b17fa19465569a91c1840 100644 (file)
@@ -20,6 +20,7 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
 import org.slf4j.Logger;
@@ -31,22 +32,29 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     private final InetSocketAddress address;
     private final int retryTimer;
     private final Bootstrap bootstrap;
+    private final BGPPeerRegistry peerRegistry;
     private final ChannelPipelineInitializer initializer;
     private BGPProtocolSessionPromise<S> pending;
 
     public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
                                final int retryTimer, final Bootstrap bootstrap,
+                               final BGPPeerRegistry peerRegistry,
                                final ChannelPipelineInitializer initializer) {
         super(executor);
         this.bootstrap = bootstrap;
         this.initializer = Preconditions.checkNotNull(initializer);
         this.address = Preconditions.checkNotNull(address);
         this.retryTimer = retryTimer;
+        this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
     }
 
     public synchronized void connect() {
+        if (this.pending != null) {
+            this.pending.cancel(true);
+        }
+
         // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
-        this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, new ChannelPipelineInitializer<S>() {
+        this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, new ChannelPipelineInitializer<S>() {
             @Override
             public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
                 BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
@@ -61,7 +69,7 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
         this.pending.addListener(new GenericFutureListener<Future<Object>>() {
             @Override
             public void operationComplete(final Future<Object> future) throws Exception {
-                if (!future.isSuccess()) {
+                if (!future.isSuccess() && !BGPReconnectPromise.this.isDone()) {
                     BGPReconnectPromise.this.setFailure(future.cause());
                 }
             }
@@ -69,8 +77,8 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     }
 
     public BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
-                                  final ChannelPipelineInitializer initializer) {
-        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap);
+            final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer initializer) {
+        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap, peerRegistry);
         final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(final SocketChannel channel) {
@@ -92,6 +100,11 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
         return this.pending.isDone() && this.pending.isSuccess();
     }
 
+    private void reconnect() {
+        Preconditions.checkNotNull(this.pending);
+        this.pending.reconnect();
+    }
+
     @Override
     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
         if (super.cancel(mayInterruptIfRunning)) {
@@ -122,7 +135,7 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
 
             if (!this.promise.isInitialConnectFinished()) {
                 LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
-                this.promise.pending.reconnect();
+                this.promise.reconnect();
                 return;
             }
 
index 0dcc142b2f673d7439db8cbab419c74842ae91cc..f71ce640a666dd9605167a5465d5f80f38e4af67 100644 (file)
@@ -84,4 +84,14 @@ public interface BGPPeerRegistry extends AutoCloseable {
      */
     @Nonnull AutoCloseable registerPeerRegisterListener(@Nonnull PeerRegistryListener listener);
 
+    /**
+     * Register PeerRegistrySessionListener, which listens to the changes in sessions
+     * of peers in peer registry (create session, remove session). After registration,
+     * an initial drop is provided by calling onSessionCreated().
+     *
+     * @param listener The PeerRegistrySessionListener to be registered.
+     * @return Registration ticked, used for closing of registration.
+     */
+    @Nonnull AutoCloseable registerPeerSessionListener(PeerRegistrySessionListener listener);
+
 }
diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/spi/PeerRegistrySessionListener.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/spi/PeerRegistrySessionListener.java
new file mode 100644 (file)
index 0000000..604738d
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bgp.rib.impl.spi;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+
+/**
+ * Listens to the session changes for peers in a PeerRegisty.
+ *
+ */
+public interface PeerRegistrySessionListener {
+
+    /**
+     * Invoked when new peer session is created.
+     * @param ip The peer's IP address.
+     */
+    void onSessionCreated(@Nonnull IpAddress ip);
+
+    /**
+     * Invoked when peer session is removed.
+     * @param ip The peer's IP address.
+     */
+    void onSessionRemoved(@Nonnull IpAddress ip);
+
+}
index d7b015909a1b1907446cce6851953f93241d7e0e..905c802e895e31d63def00d31130cebb9583a81c 100644 (file)
@@ -23,6 +23,7 @@ import org.mockito.Mockito;
 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
 import org.opendaylight.protocol.bgp.parser.BGPError;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
+import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistrySessionListener;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
@@ -70,6 +71,13 @@ public class StrictBGPPeerRegistryTest {
         return mock;
     }
 
+    private static PeerRegistrySessionListener getMockSessionListener() {
+        final PeerRegistrySessionListener mock = Mockito.mock(PeerRegistrySessionListener.class);
+        Mockito.doNothing().when(mock).onSessionCreated(Mockito.any(IpAddress.class));
+        Mockito.doNothing().when(mock).onSessionRemoved(Mockito.any(IpAddress.class));
+        return mock;
+    }
+
     @Test
     public void testIpAddressConstruction() throws Exception {
         final InetSocketAddress adr = new InetSocketAddress("127.0.0.1", 179);
@@ -201,4 +209,44 @@ public class StrictBGPPeerRegistryTest {
         }
         fail("Peer AS number mismatch");
     }
+
+    @Test
+    public void testRegisterPeerSessionListener() throws Exception {
+        final PeerRegistrySessionListener sessionListener1 = getMockSessionListener();
+        this.peerRegistry.registerPeerSessionListener(sessionListener1);
+
+        final PeerRegistrySessionListener sessionListener2 = getMockSessionListener();
+        this.peerRegistry.registerPeerSessionListener(sessionListener2);
+
+        this.peerRegistry.addPeer(REMOTE_IP, this.peer1, this.mockPreferences);
+        this.peerRegistry.getPeer(REMOTE_IP, FROM, TO, this.classicOpen);
+        Mockito.verify(sessionListener1, Mockito.times(1)).onSessionCreated(REMOTE_IP);
+        Mockito.verify(sessionListener2, Mockito.times(1)).onSessionCreated(REMOTE_IP);
+
+        this.peerRegistry.removePeerSession(REMOTE_IP);
+        Mockito.verify(sessionListener1, Mockito.times(1)).onSessionRemoved(REMOTE_IP);
+        Mockito.verify(sessionListener2, Mockito.times(1)).onSessionRemoved(REMOTE_IP);
+    }
+
+    @Test
+    public void testClosePeerSessionOneListener() throws Exception {
+        final PeerRegistrySessionListener sessionListener1 = getMockSessionListener();
+        final AutoCloseable registration1 = this.peerRegistry.registerPeerSessionListener(sessionListener1);
+
+        final PeerRegistrySessionListener sessionListener2 = getMockSessionListener();
+        this.peerRegistry.registerPeerSessionListener(sessionListener2);
+
+        this.peerRegistry.addPeer(REMOTE_IP, this.peer1, this.mockPreferences);
+        this.peerRegistry.getPeer(REMOTE_IP, FROM, TO, this.classicOpen);
+        this.peerRegistry.removePeerSession(REMOTE_IP);
+
+        registration1.close();
+        this.peerRegistry.getPeer(REMOTE_IP, FROM, TO, this.classicOpen);
+        this.peerRegistry.removePeerSession(REMOTE_IP);
+
+        Mockito.verify(sessionListener1, Mockito.times(1)).onSessionCreated(REMOTE_IP);
+        Mockito.verify(sessionListener2, Mockito.times(2)).onSessionCreated(REMOTE_IP);
+        Mockito.verify(sessionListener1, Mockito.times(1)).onSessionRemoved(REMOTE_IP);
+        Mockito.verify(sessionListener2, Mockito.times(2)).onSessionRemoved(REMOTE_IP);
+    }
 }