BUG-7222: Make BGP DS clean up asynchronous 74/51974/1
authorClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Wed, 15 Feb 2017 08:58:38 +0000 (09:58 +0100)
committerClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Thu, 16 Feb 2017 17:17:46 +0000 (18:17 +0100)
Close of the service must be done async, otherwise
will bock the thread.
Fix by make BGP DS clean up asynchronous

Change-Id: Icb19ef7b7f118fbc6b07407d1df2cb5f6cffc0fb
Signed-off-by: Claudio D. Gasparini <claudio.gasparini@pantheon.tech>
13 files changed:
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AdjRibInWriter.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeer.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPPeer.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/config/AppPeer.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/config/BgpPeer.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/PeerTest.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/SimpleSessionListener.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/StrictBGPPeerRegistryTest.java
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/BGPSessionListener.java
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/Peer.java
bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/TestingListener.java
bgp/topology-provider/src/main/java/org/opendaylight/bgpcep/bgp/topology/provider/AbstractTopologyBuilder.java
bgp/topology-provider/src/main/java/org/opendaylight/bgpcep/bgp/topology/provider/config/TopologyReferenceSingletonServiceImpl.java

index e9a92b71a369032aaeaf8a81f8bbad2e6879fa79..e6761de07ea55e4122a78bcd52cb4a1cb0227cfa 100644 (file)
@@ -11,8 +11,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -247,17 +249,25 @@ final class AdjRibInWriter {
         return pb.build();
     }
 
-    void removePeer() {
+    ListenableFuture<Void> removePeer() {
         if(this.peerPath != null) {
             final DOMDataWriteTransaction tx = this.chain.newWriteOnlyTransaction();
             tx.delete(LogicalDatastoreType.OPERATIONAL, this.peerPath);
+            final CheckedFuture<Void, TransactionCommitFailedException> future = tx.submit();
+            Futures.addCallback(future, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(final Void result) {
+                    LOG.debug("Peer {} removed", AdjRibInWriter.this.peerPath);
+                }
 
-            try {
-                tx.submit().checkedGet();
-            } catch (final TransactionCommitFailedException e) {
-                LOG.debug("Failed to remove Peer {}", this.peerPath, e);
-            }
+                @Override
+                public void onFailure(final Throwable t) {
+                    LOG.warn("Failed to remove Peer {}", AdjRibInWriter.this.peerPath, t);
+                }
+            });
+            return future;
         }
+        return Futures.immediateFuture(null);
     }
 
     void markTableUptodate(final TablesKey tableTypes) {
index c1fb365ef798055309e1d830a568f8c80c5d2ce0..14bcb2b0f886928e8f87d20dd2207c8f44d05e74 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.protocol.bgp.rib.impl;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -66,7 +68,7 @@ import org.slf4j.LoggerFactory;
  * For purposed of import policies such as Best Path Selection, application
  * peer needs to have a BGP-ID that is configurable.
  */
-public class ApplicationPeer implements AutoCloseable, org.opendaylight.protocol.bgp.rib.spi.Peer, ClusteredDOMDataTreeChangeListener, TransactionChainListener {
+public class ApplicationPeer implements org.opendaylight.protocol.bgp.rib.spi.Peer, ClusteredDOMDataTreeChangeListener, TransactionChainListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationPeer.class);
 
@@ -248,8 +250,9 @@ public class ApplicationPeer implements AutoCloseable, org.opendaylight.protocol
         return this.name;
     }
 
+    // FIXME ListenableFuture<?> should be used once closeServiceInstance uses wildcard too
     @Override
-    public synchronized void close() {
+    public synchronized ListenableFuture<Void> close() {
         if (this.registration != null) {
             this.registration.close();
             this.registration = null;
@@ -257,8 +260,11 @@ public class ApplicationPeer implements AutoCloseable, org.opendaylight.protocol
         if (this.effectiveRibInWriter != null) {
             this.effectiveRibInWriter.close();
         }
+        final ListenableFuture<Void> future;
         if (this.adjRibInWriter != null) {
-            this.adjRibInWriter.removePeer();
+            future = this.adjRibInWriter.removePeer();
+        }else {
+            future = Futures.immediateFuture(null);
         }
         if (this.chain != null) {
             this.chain.close();
@@ -271,6 +277,7 @@ public class ApplicationPeer implements AutoCloseable, org.opendaylight.protocol
         if (this.moduleTracker != null) {
             this.moduleTracker.onInstanceClose();
         }
+        return future;
     }
 
     @Override
index 230ef3029ce22a60aa64bb4ba796bccba9e24861..4cbcd0cef849d7c73af7e4259ba06e723b38b0ff 100644 (file)
@@ -15,6 +15,8 @@ import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -93,7 +95,7 @@ import org.slf4j.LoggerFactory;
  * Class representing a peer. We have a single instance for each peer, which provides translation from BGP events into
  * RIB actions.
  */
-public class BGPPeer implements BGPSessionListener, Peer, AutoCloseable, BGPPeerRuntimeMXBean, TransactionChainListener {
+public class BGPPeer implements BGPSessionListener, Peer, BGPPeerRuntimeMXBean, TransactionChainListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(BGPPeer.class);
 
@@ -140,13 +142,15 @@ public class BGPPeer implements BGPSessionListener, Peer, AutoCloseable, BGPPeer
     public void instantiateServiceInstance() {
         // add current peer to "configured BGP peer" stats
         this.rib.getRenderStats().getConfiguredPeerCounter().increaseCount();
-        this.ribWriter = AdjRibInWriter.create(rib.getYangRibId(), this.peerRole, this.simpleRoutingPolicy, this.chain);
+        this.ribWriter = AdjRibInWriter.create(this.rib.getYangRibId(), this.peerRole, this.simpleRoutingPolicy, this.chain);
     }
 
+    // FIXME ListenableFuture<?> should be used once closeServiceInstance uses wildcard too
     @Override
-    public synchronized void close() {
-        releaseConnection();
+    public synchronized ListenableFuture<Void> close() {
+        final ListenableFuture<Void> future = releaseConnection();
         this.chain.close();
+        return future;
     }
 
     @Override
@@ -357,17 +361,18 @@ public class BGPPeer implements BGPSessionListener, Peer, AutoCloseable, BGPPeer
         }
     }
 
-    private void cleanup() {
+    private ListenableFuture<Void> cleanup() {
         // FIXME: BUG-196: support graceful
         this.adjRibOutListenerSet.values().forEach(AdjRibOutListener::close);
         this.adjRibOutListenerSet.clear();
         if (this.effRibInWriter != null) {
             this.effRibInWriter.close();
         }
-        if(this.ribWriter != null) {
-            this.ribWriter.removePeer();
-        }
         this.tables.clear();
+        if (this.ribWriter != null) {
+            return this.ribWriter.removePeer();
+        }
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -403,14 +408,14 @@ public class BGPPeer implements BGPSessionListener, Peer, AutoCloseable, BGPPeer
     }
 
     @Override
-    @GuardedBy("this")
-    public synchronized void releaseConnection() {
+    public synchronized ListenableFuture<Void> releaseConnection() {
         if (this.rpcRegistration != null) {
             this.rpcRegistration.close();
         }
         closeRegistration();
-        cleanup();
+        final ListenableFuture<Void> future = cleanup();
         dropConnection();
+        return future;
     }
 
     private void closeRegistration() {
index 580900bff81560f772faa0d2ea7eeaaad440e9dc..70156ccad6ea537ab453659ccb18e9492b6c8d56 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.protocol.bgp.rib.impl.config;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Objects;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -124,8 +123,7 @@ public final class AppPeer implements PeerBean {
         @Override
         public ListenableFuture<Void> closeServiceInstance() {
             LOG.info("Application Peer Singleton Service {} instance closed", getIdentifier());
-            this.applicationPeer.close();
-            return Futures.immediateFuture(null);
+            return this.applicationPeer.close();
         }
 
         @Override
index 3af961fe6d8777a9710ff78c839f6f944093bcb4..d0365212a47a4eac6c80ecb0de5fe8a4cbfd08b4 100644 (file)
@@ -15,7 +15,6 @@ import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUti
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.concurrent.Future;
 import java.net.InetSocketAddress;
@@ -258,11 +257,11 @@ public final class BgpPeer implements PeerBean, BGPPeerRuntimeMXBean {
                 this.connection.cancel(true);
                 this.connection = null;
             }
-            this.bgpPeer.close();
+            final ListenableFuture<Void> future = this.bgpPeer.close();
             if(BgpPeer.this.currentConfiguration != null) {
                 BgpPeer.this.peerRegistry.removePeer(BgpPeer.this.currentConfiguration.getNeighborAddress());
             }
-            return Futures.immediateFuture(null);
+            return future;
         }
 
         @Override
index 4365e26f70c72ca836c961f3237e872752bdb202..b7d07ebf8d016fbf8efe0762d78c8715f54d95b4 100644 (file)
@@ -158,7 +158,7 @@ public class PeerTest extends AbstractRIBTestSetup {
         try {
             this.classic.onMessage(this.session, ub.build());
             fail();
-        } catch (BGPDocumentedException e) {
+        } catch (final BGPDocumentedException e) {
             assertEquals(BGPError.MANDATORY_ATTR_MISSING_MSG + "LOCAL_PREF", e.getMessage());
             assertEquals(BGPError.WELL_KNOWN_ATTR_MISSING.getCode(), e.getError().getCode());
             assertEquals(BGPError.WELL_KNOWN_ATTR_MISSING.getSubcode(), e.getError().getSubcode());
@@ -172,18 +172,17 @@ public class PeerTest extends AbstractRIBTestSetup {
         assertEquals(3, this.routes.size());
 
         //create new peer so that it gets advertized routes from RIB
-        try (final BGPPeer testingPeer = new BGPPeer("testingPeer", getRib(), PeerRole.Ibgp, null)) {
-            testingPeer.instantiateServiceInstance();
-            testingPeer.onSessionUp(this.session);
-            assertEquals(3, this.routes.size());
-            assertEquals(1, testingPeer.getBgpPeerState().getSessionEstablishedCount().getValue().intValue());
-            final List<RouteTable> routeTables = testingPeer.getBgpPeerState().getRouteTable();
-            assertEquals(1, routeTables.size());
-            final RouteTable routeTable = routeTables.get(0);
-            assertEquals(AFI_QNAME.toString(), routeTable.getAfi().getqNameOfIdentity());
-            assertEquals(SAFI_QNAME.toString(), routeTable.getSafi().getqNameOfIdentity());
-            assertNotNull(testingPeer.getBgpSessionState());
-        }
+        final BGPPeer testingPeer = new BGPPeer("testingPeer", getRib(), PeerRole.Ibgp, null);
+        testingPeer.instantiateServiceInstance();
+        testingPeer.onSessionUp(this.session);
+        assertEquals(3, this.routes.size());
+        assertEquals(1, testingPeer.getBgpPeerState().getSessionEstablishedCount().getValue().intValue());
+        final List<RouteTable> routeTables = testingPeer.getBgpPeerState().getRouteTable();
+        assertEquals(1, routeTables.size());
+        final RouteTable routeTable = routeTables.get(0);
+        assertEquals(AFI_QNAME.toString(), routeTable.getAfi().getqNameOfIdentity());
+        assertEquals(SAFI_QNAME.toString(), routeTable.getSafi().getqNameOfIdentity());
+        assertNotNull(testingPeer.getBgpSessionState());
 
         final List<Ipv4Prefix> prefs2 = Lists.newArrayList(new Ipv4Prefix("8.0.1.0/28"), new Ipv4Prefix("8.0.1.16/28"));
         ub.setNlri(new NlriBuilder().setNlri(prefs2).build());
index 6e8f867adfe2978f522b1f065cb5bab19f6d19f8..d2e33cbe5eae9332b64c24e3223db177b7e26489 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.protocol.bgp.rib.impl;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -67,7 +69,7 @@ public final class SimpleSessionListener implements BGPSessionListener {
     }
 
     @Override
-    public void releaseConnection() {
+    public ListenableFuture<Void> releaseConnection() {
         LOG.debug("Releasing connection");
         if (this.session != null) {
             try {
@@ -76,6 +78,7 @@ public final class SimpleSessionListener implements BGPSessionListener {
                 LOG.warn("Error closing session", e);
             }
         }
+        return Futures.immediateFuture(null);
     }
 
     BGPSessionImpl.State getState() {
index 905c802e895e31d63def00d31130cebb9583a81c..8897945c085e2847003fc6a43c3e9bf79bda27c9 100644 (file)
@@ -14,6 +14,7 @@ import static org.junit.Assert.fail;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
@@ -67,7 +68,7 @@ public class StrictBGPPeerRegistryTest {
 
     private static BGPSessionListener getMockSession() {
         final BGPSessionListener mock = Mockito.mock(BGPSessionListener.class);
-        Mockito.doNothing().when(mock).releaseConnection();
+        Mockito.doReturn(Futures.immediateFuture(null)).when(mock).releaseConnection();
         return mock;
     }
 
index be65fb4e24e60cb3fb08dc8673b6b6ec6f722396..4b30eee531cf9dd5fc1d061b7bd8ae184b473131 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.protocol.bgp.rib.spi;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.EventListener;
 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
@@ -54,5 +55,5 @@ public interface BGPSessionListener extends EventListener {
      */
     void onMessage(BGPSession session, Notification notification) throws BGPDocumentedException;
 
-    void releaseConnection();
+    ListenableFuture<?> releaseConnection();
 }
index eb8ab087ad37df146009bf95d556030739e8b4ef..13ca868915b18ac7fdf732e351f92126564ae7f9 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.protocol.bgp.rib.spi;
 
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Marker interface identifying a BGP peer.
@@ -24,4 +25,11 @@ public interface Peer {
      * @return byte[] raw identifier
      */
     byte[] getRawIdentifier();
+
+    /**
+     *  Close Peers and performs asynchronously DS clean up
+     *
+     * @return future
+     */
+    ListenableFuture<?> close();
 }
index c022aad54975be854cf1449996f8d4a8f8f44efc..c2bb0b53e04e434b24c8bb8a35fbc4770b660119 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.protocol.bgp.testtool;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
 import org.opendaylight.protocol.bgp.rib.impl.BGPSessionImpl;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
@@ -71,8 +73,9 @@ final class TestingListener implements BGPSessionListener {
     }
 
     @Override
-    public void releaseConnection() {
+    public ListenableFuture<?> releaseConnection() {
         LOG.info("Client Listener: Connection released.");
+        return Futures.immediateFuture(null);
     }
 
     void printCount(final String localAddress) {
index de1070ab8344a34c251a9c5103d62bccc8f8bbab..7cc9079ec57feb33f16a94c85d7fd807be8ee7ba 100644 (file)
@@ -9,8 +9,10 @@ package org.opendaylight.bgpcep.bgp.topology.provider;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,7 +51,8 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractTopologyBuilder<T extends Route> implements AutoCloseable, ClusteredDataTreeChangeListener<T>, TopologyReference, TransactionChainListener {
+public abstract class AbstractTopologyBuilder<T extends Route> implements ClusteredDataTreeChangeListener<T>,
+    TopologyReference, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
     // we limit the listener reset interval to be 5 min at most
     private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
@@ -147,17 +150,17 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements AutoCl
         return this.topology;
     }
 
-    @Override
-    public final synchronized void close() {
+    public final synchronized ListenableFuture<Void> close() {
         if (this.closed) {
             LOG.trace("Transaction chain was already closed.");
-            return;
+            Futures.immediateFuture(null);
         }
         this.closed = true;
         LOG.info("Shutting down builder for {}", getInstanceIdentifier());
         unregisterDataChangeListener();
-        destroyOperationalTopology();
+        final ListenableFuture<Void> future = destroyOperationalTopology();
         destroyTransactionChain();
+        return future;
     }
 
     @Override
@@ -252,16 +255,25 @@ public abstract class AbstractTopologyBuilder<T extends Route> implements AutoCl
      * Destroy the current operational topology data. Note a valid transaction must be provided
      * @throws TransactionCommitFailedException
      */
-    private synchronized void destroyOperationalTopology() {
+    private synchronized ListenableFuture<Void> destroyOperationalTopology() {
         Preconditions.checkNotNull(this.chain, "A valid transaction chain must be provided.");
         final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
         trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
-        try {
-            trans.submit().checkedGet();
-        } catch (final TransactionCommitFailedException e) {
-            LOG.error("Unable to reset operational topology {} (transaction {})", this.topology, trans.getIdentifier(), e);
-        }
+        final CheckedFuture<Void, TransactionCommitFailedException> future = trans.submit();
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Operational topology removed {}", AbstractTopologyBuilder.this.topology);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("Unable to reset operational topology {} (transaction {})",
+                    AbstractTopologyBuilder.this.topology, trans.getIdentifier(), t);
+            }
+        });
         clearTopology();
+        return future;
     }
 
     /**
index 655fd7705fd464bde83fd45caab8a371d2f819d5..fe4e35a9127b2922398e90b6b267d9c636bfa253 100644 (file)
@@ -59,8 +59,7 @@ final class TopologyReferenceSingletonServiceImpl implements TopologyReferenceSi
     @Override
     public ListenableFuture<Void> closeServiceInstance() {
         LOG.info("Close Topology Singleton Service {}", getIdentifier());
-        this.topologyBuilder.close();
-        return Futures.immediateFuture(null);
+        return this.topologyBuilder.close();
     }
 
     @Override